| 1 | //! Blocking connection API. |
| 2 | |
| 3 | use enumflags2::BitFlags; |
| 4 | use event_listener::EventListener; |
| 5 | use static_assertions::assert_impl_all; |
| 6 | use std::{io, ops::Deref}; |
| 7 | use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName}; |
| 8 | use zvariant::ObjectPath; |
| 9 | |
| 10 | use crate::{ |
| 11 | blocking::ObjectServer, |
| 12 | fdo::{ConnectionCredentials, RequestNameFlags, RequestNameReply}, |
| 13 | message::Message, |
| 14 | utils::block_on, |
| 15 | DBusError, Error, Result, |
| 16 | }; |
| 17 | |
| 18 | mod builder; |
| 19 | pub use builder::Builder; |
| 20 | |
| 21 | /// A blocking wrapper of [`zbus::Connection`]. |
| 22 | /// |
| 23 | /// Most of the API is very similar to [`zbus::Connection`], except it's blocking. |
| 24 | #[derive (Debug, Clone)] |
| 25 | #[must_use = "Dropping a `Connection` will close the underlying socket." ] |
| 26 | pub struct Connection { |
| 27 | inner: crate::Connection, |
| 28 | } |
| 29 | |
| 30 | assert_impl_all!(Connection: Send, Sync, Unpin); |
| 31 | |
| 32 | impl Connection { |
| 33 | /// Create a `Connection` to the session/user message bus. |
| 34 | pub fn session() -> Result<Self> { |
| 35 | block_on(crate::Connection::session()).map(Self::from) |
| 36 | } |
| 37 | |
| 38 | /// Create a `Connection` to the system-wide message bus. |
| 39 | pub fn system() -> Result<Self> { |
| 40 | block_on(crate::Connection::system()).map(Self::from) |
| 41 | } |
| 42 | |
| 43 | /// The capacity of the main (unfiltered) queue. |
| 44 | pub fn max_queued(&self) -> usize { |
| 45 | self.inner.max_queued() |
| 46 | } |
| 47 | |
| 48 | /// Set the capacity of the main (unfiltered) queue. |
| 49 | pub fn set_max_queued(mut self, max: usize) { |
| 50 | self.inner.set_max_queued(max) |
| 51 | } |
| 52 | |
| 53 | /// The server's GUID. |
| 54 | pub fn server_guid(&self) -> &str { |
| 55 | self.inner.server_guid() |
| 56 | } |
| 57 | |
| 58 | /// The unique name as assigned by the message bus or `None` if not a message bus connection. |
| 59 | pub fn unique_name(&self) -> Option<&OwnedUniqueName> { |
| 60 | self.inner.unique_name() |
| 61 | } |
| 62 | |
| 63 | /// Send `msg` to the peer. |
| 64 | pub fn send(&self, msg: &Message) -> Result<()> { |
| 65 | block_on(self.inner.send(msg)) |
| 66 | } |
| 67 | |
| 68 | /// Send a method call. |
| 69 | /// |
| 70 | /// Create a method-call message, send it over the connection, then wait for the reply. |
| 71 | /// |
| 72 | /// On successful reply, an `Ok(Message)` is returned. On error, an `Err` is returned. D-Bus |
| 73 | /// error replies are returned as [`Error::MethodError`]. |
| 74 | pub fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>( |
| 75 | &self, |
| 76 | destination: Option<D>, |
| 77 | path: P, |
| 78 | iface: Option<I>, |
| 79 | method_name: M, |
| 80 | body: &B, |
| 81 | ) -> Result<Message> |
| 82 | where |
| 83 | D: TryInto<BusName<'d>>, |
| 84 | P: TryInto<ObjectPath<'p>>, |
| 85 | I: TryInto<InterfaceName<'i>>, |
| 86 | M: TryInto<MemberName<'m>>, |
| 87 | D::Error: Into<Error>, |
| 88 | P::Error: Into<Error>, |
| 89 | I::Error: Into<Error>, |
| 90 | M::Error: Into<Error>, |
| 91 | B: serde::ser::Serialize + zvariant::DynamicType, |
| 92 | { |
| 93 | block_on( |
| 94 | self.inner |
| 95 | .call_method(destination, path, iface, method_name, body), |
| 96 | ) |
| 97 | } |
| 98 | |
| 99 | /// Emit a signal. |
| 100 | /// |
| 101 | /// Create a signal message, and send it over the connection. |
| 102 | pub fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>( |
| 103 | &self, |
| 104 | destination: Option<D>, |
| 105 | path: P, |
| 106 | iface: I, |
| 107 | signal_name: M, |
| 108 | body: &B, |
| 109 | ) -> Result<()> |
| 110 | where |
| 111 | D: TryInto<BusName<'d>>, |
| 112 | P: TryInto<ObjectPath<'p>>, |
| 113 | I: TryInto<InterfaceName<'i>>, |
| 114 | M: TryInto<MemberName<'m>>, |
| 115 | D::Error: Into<Error>, |
| 116 | P::Error: Into<Error>, |
| 117 | I::Error: Into<Error>, |
| 118 | M::Error: Into<Error>, |
| 119 | B: serde::ser::Serialize + zvariant::DynamicType, |
| 120 | { |
| 121 | block_on( |
| 122 | self.inner |
| 123 | .emit_signal(destination, path, iface, signal_name, body), |
| 124 | ) |
| 125 | } |
| 126 | |
| 127 | /// Reply to a message. |
| 128 | /// |
| 129 | /// Given an existing message (likely a method call), send a reply back to the caller with the |
| 130 | /// given `body`. |
| 131 | pub fn reply<B>(&self, call: &Message, body: &B) -> Result<()> |
| 132 | where |
| 133 | B: serde::ser::Serialize + zvariant::DynamicType, |
| 134 | { |
| 135 | block_on(self.inner.reply(call, body)) |
| 136 | } |
| 137 | |
| 138 | /// Reply an error to a message. |
| 139 | /// |
| 140 | /// Given an existing message (likely a method call), send an error reply back to the caller |
| 141 | /// with the given `error_name` and `body`. |
| 142 | /// |
| 143 | /// Returns the message serial number. |
| 144 | pub fn reply_error<'e, E, B>(&self, call: &Message, error_name: E, body: &B) -> Result<()> |
| 145 | where |
| 146 | B: serde::ser::Serialize + zvariant::DynamicType, |
| 147 | E: TryInto<ErrorName<'e>>, |
| 148 | E::Error: Into<Error>, |
| 149 | { |
| 150 | block_on(self.inner.reply_error(call, error_name, body)) |
| 151 | } |
| 152 | |
| 153 | /// Reply to a method call with an error. |
| 154 | /// |
| 155 | /// Given an existing method call message header, send an error reply back to the caller |
| 156 | /// using one of the standard interface reply types. |
| 157 | /// |
| 158 | /// Returns the message serial number. |
| 159 | pub fn reply_dbus_error( |
| 160 | &self, |
| 161 | call: &zbus::message::Header<'_>, |
| 162 | err: impl DBusError, |
| 163 | ) -> Result<()> { |
| 164 | block_on(self.inner.reply_dbus_error(call, err)) |
| 165 | } |
| 166 | |
| 167 | /// Register a well-known name for this service on the bus. |
| 168 | /// |
| 169 | /// Blocking version of [`crate::Connection::request_name`]. See docs there for more details |
| 170 | /// and caveats. |
| 171 | pub fn request_name<'w, W>(&self, well_known_name: W) -> Result<()> |
| 172 | where |
| 173 | W: TryInto<WellKnownName<'w>>, |
| 174 | W::Error: Into<Error>, |
| 175 | { |
| 176 | block_on(self.inner.request_name(well_known_name)) |
| 177 | } |
| 178 | |
| 179 | /// Register a well-known name for this service on the bus. |
| 180 | /// |
| 181 | /// Blocking version of [`crate::Connection::request_name_with_flags`]. See docs there for more |
| 182 | /// details and caveats. |
| 183 | pub fn request_name_with_flags<'w, W>( |
| 184 | &self, |
| 185 | well_known_name: W, |
| 186 | flags: BitFlags<RequestNameFlags>, |
| 187 | ) -> Result<RequestNameReply> |
| 188 | where |
| 189 | W: TryInto<WellKnownName<'w>>, |
| 190 | W::Error: Into<Error>, |
| 191 | { |
| 192 | block_on(self.inner.request_name_with_flags(well_known_name, flags)) |
| 193 | } |
| 194 | |
| 195 | /// Deregister a previously registered well-known name for this service on the bus. |
| 196 | /// |
| 197 | /// Use this method to deregister a well-known name, registered through |
| 198 | /// [`Connection::request_name`]. |
| 199 | /// |
| 200 | /// Unless an error is encountered, returns `Ok(true)` if name was previously registered with |
| 201 | /// the bus through `self` and it has now been successfully deregistered, `Ok(false)` if name |
| 202 | /// was not previously registered or already deregistered. |
| 203 | pub fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool> |
| 204 | where |
| 205 | W: TryInto<WellKnownName<'w>>, |
| 206 | W::Error: Into<Error>, |
| 207 | { |
| 208 | block_on(self.inner.release_name(well_known_name)) |
| 209 | } |
| 210 | |
| 211 | /// Checks if `self` is a connection to a message bus. |
| 212 | /// |
| 213 | /// This will return `false` for p2p connections. |
| 214 | pub fn is_bus(&self) -> bool { |
| 215 | self.inner.is_bus() |
| 216 | } |
| 217 | |
| 218 | /// Get a reference to the associated [`ObjectServer`]. |
| 219 | /// |
| 220 | /// The `ObjectServer` is created on-demand. |
| 221 | pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ { |
| 222 | self.inner.sync_object_server(true, None) |
| 223 | } |
| 224 | |
| 225 | /// Get a reference to the underlying async Connection. |
| 226 | pub fn inner(&self) -> &crate::Connection { |
| 227 | &self.inner |
| 228 | } |
| 229 | |
| 230 | /// Get the underlying async Connection, consuming `self`. |
| 231 | pub fn into_inner(self) -> crate::Connection { |
| 232 | self.inner |
| 233 | } |
| 234 | |
| 235 | /// Returns a listener, notified on various connection activity. |
| 236 | /// |
| 237 | /// This function is meant for the caller to implement idle or timeout on inactivity. |
| 238 | pub fn monitor_activity(&self) -> EventListener { |
| 239 | self.inner.monitor_activity() |
| 240 | } |
| 241 | |
| 242 | /// Returns the peer credentials. |
| 243 | /// |
| 244 | /// The fields are populated on the best effort basis. Some or all fields may not even make |
| 245 | /// sense for certain sockets or on certain platforms and hence will be set to `None`. |
| 246 | /// |
| 247 | /// # Caveats |
| 248 | /// |
| 249 | /// Currently `unix_group_ids` and `linux_security_label` fields are not populated. |
| 250 | pub fn peer_credentials(&self) -> io::Result<ConnectionCredentials> { |
| 251 | block_on(self.inner.peer_credentials()) |
| 252 | } |
| 253 | |
| 254 | /// Close the connection. |
| 255 | /// |
| 256 | /// After this call, all reading and writing operations will fail. |
| 257 | pub fn close(self) -> Result<()> { |
| 258 | block_on(self.inner.close()) |
| 259 | } |
| 260 | } |
| 261 | |
| 262 | impl From<crate::Connection> for Connection { |
| 263 | fn from(conn: crate::Connection) -> Self { |
| 264 | Self { inner: conn } |
| 265 | } |
| 266 | } |
| 267 | |
| 268 | #[cfg (feature = "p2p" )] |
| 269 | #[cfg (all(test, unix))] |
| 270 | mod tests { |
| 271 | use event_listener::Listener; |
| 272 | use ntest::timeout; |
| 273 | #[cfg (all(unix, not(feature = "tokio" )))] |
| 274 | use std::os::unix::net::UnixStream; |
| 275 | use std::thread; |
| 276 | use test_log::test; |
| 277 | #[cfg (all(unix, feature = "tokio" ))] |
| 278 | use tokio::net::UnixStream; |
| 279 | #[cfg (all(windows, not(feature = "tokio" )))] |
| 280 | use uds_windows::UnixStream; |
| 281 | |
| 282 | use crate::{ |
| 283 | blocking::{connection::Builder, MessageIterator}, |
| 284 | Guid, |
| 285 | }; |
| 286 | |
| 287 | #[test ] |
| 288 | #[timeout(15000)] |
| 289 | fn unix_p2p() { |
| 290 | let guid = Guid::generate(); |
| 291 | |
| 292 | // Tokio needs us to call the sync function from async context. :shrug: |
| 293 | let (p0, p1) = crate::utils::block_on(async { UnixStream::pair().unwrap() }); |
| 294 | |
| 295 | let (tx, rx) = std::sync::mpsc::channel(); |
| 296 | let server_thread = thread::spawn(move || { |
| 297 | let c = Builder::unix_stream(p0) |
| 298 | .server(guid) |
| 299 | .unwrap() |
| 300 | .p2p() |
| 301 | .build() |
| 302 | .unwrap(); |
| 303 | rx.recv().unwrap(); |
| 304 | let reply = c |
| 305 | .call_method(None::<()>, "/" , Some("org.zbus.p2p" ), "Test" , &()) |
| 306 | .unwrap(); |
| 307 | assert_eq!(reply.to_string(), "Method return" ); |
| 308 | let val: String = reply.body().deserialize().unwrap(); |
| 309 | val |
| 310 | }); |
| 311 | |
| 312 | let c = Builder::unix_stream(p1).p2p().build().unwrap(); |
| 313 | |
| 314 | let listener = c.monitor_activity(); |
| 315 | |
| 316 | let mut s = MessageIterator::from(&c); |
| 317 | tx.send(()).unwrap(); |
| 318 | let m = s.next().unwrap().unwrap(); |
| 319 | assert_eq!(m.to_string(), "Method call Test" ); |
| 320 | c.reply(&m, &("yay" )).unwrap(); |
| 321 | |
| 322 | for _ in s {} |
| 323 | |
| 324 | let val = server_thread.join().expect("failed to join server thread" ); |
| 325 | assert_eq!(val, "yay" ); |
| 326 | |
| 327 | // there was some activity |
| 328 | listener.wait(); |
| 329 | // eventually, nothing happens and it will timeout |
| 330 | loop { |
| 331 | let listener = c.monitor_activity(); |
| 332 | if listener |
| 333 | .wait_timeout(std::time::Duration::from_millis(10)) |
| 334 | .is_none() |
| 335 | { |
| 336 | break; |
| 337 | } |
| 338 | } |
| 339 | } |
| 340 | } |
| 341 | |