| 1 | //! Contains some helper structs and traits common to all Connection types.- |
| 2 | |
| 3 | use crate::{Error, Message, to_c_str}; |
| 4 | use std::{str, time::Duration, collections::HashMap}; |
| 5 | use std::sync::{Mutex, atomic::AtomicU8, atomic::Ordering}; |
| 6 | use std::ffi::CStr; |
| 7 | use std::os::raw::{c_void, c_int}; |
| 8 | use super::{BusType, Watch, WatchFd}; |
| 9 | |
| 10 | #[derive (Debug)] |
| 11 | struct ConnHandle(*mut ffi::DBusConnection, bool); |
| 12 | |
| 13 | unsafe impl Send for ConnHandle {} |
| 14 | unsafe impl Sync for ConnHandle {} |
| 15 | |
| 16 | impl Drop for ConnHandle { |
| 17 | fn drop(&mut self) { |
| 18 | if self.1 { unsafe { |
| 19 | ffi::dbus_connection_close(self.0); |
| 20 | ffi::dbus_connection_unref(self.0); |
| 21 | }} |
| 22 | } |
| 23 | } |
| 24 | |
| 25 | #[derive (Debug, Eq, PartialEq, Hash)] |
| 26 | struct WatchHandle(*mut ffi::DBusWatch); |
| 27 | |
| 28 | unsafe impl Send for WatchHandle {} |
| 29 | unsafe impl Sync for WatchHandle {} |
| 30 | |
| 31 | /// This struct must be boxed as it is called from D-Bus callbacks! |
| 32 | #[derive (Debug)] |
| 33 | struct WatchMap { |
| 34 | conn: ConnHandle, |
| 35 | list: Mutex<HashMap<WatchHandle, (Watch, bool)>>, |
| 36 | current_rw: AtomicU8, |
| 37 | current_fd: Option<WatchFd>, |
| 38 | } |
| 39 | |
| 40 | fn calc_rw(list: &HashMap<WatchHandle, (Watch, bool)>) -> u8 { |
| 41 | let mut r: u8 = 0; |
| 42 | for (w: &Watch, b: &bool) in list.values() { |
| 43 | if *b && w.read { r |= 1; } |
| 44 | if *b && w.write { r |= 2; } |
| 45 | } |
| 46 | r |
| 47 | } |
| 48 | |
| 49 | impl WatchMap { |
| 50 | fn new(conn: ConnHandle) -> Box<WatchMap> { |
| 51 | extern "C" fn add_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) -> u32 { unsafe { |
| 52 | let wm: &WatchMap = &*(data as *mut _); |
| 53 | wm.list.lock().unwrap().insert(WatchHandle(watch), Watch::from_raw_enabled(watch)); |
| 54 | 1 |
| 55 | }} |
| 56 | extern "C" fn remove_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) { unsafe { |
| 57 | let wm: &WatchMap = &*(data as *mut _); |
| 58 | wm.list.lock().unwrap().remove(&WatchHandle(watch)); |
| 59 | }} |
| 60 | extern "C" fn toggled_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) { unsafe { |
| 61 | let wm: &WatchMap = &*(data as *mut _); |
| 62 | let mut list = wm.list.lock().unwrap(); |
| 63 | let (_, ref mut b) = list.get_mut(&WatchHandle(watch)).unwrap(); |
| 64 | *b = ffi::dbus_watch_get_enabled(watch) != 0; |
| 65 | wm.current_rw.store(calc_rw(&list), Ordering::Release); |
| 66 | }} |
| 67 | |
| 68 | let mut wm = Box::new(WatchMap { |
| 69 | conn, list: Default::default(), current_rw: Default::default(), current_fd: None |
| 70 | }); |
| 71 | let wptr: &WatchMap = &wm; |
| 72 | if unsafe { ffi::dbus_connection_set_watch_functions(wm.conn.0, |
| 73 | Some(add_watch_cb), Some(remove_watch_cb), Some(toggled_watch_cb), wptr as *const _ as *mut _, None) } == 0 { |
| 74 | panic!("Cannot enable watch tracking (OOM?)" ) |
| 75 | } |
| 76 | |
| 77 | { |
| 78 | let list = wm.list.lock().unwrap(); |
| 79 | wm.current_rw.store(calc_rw(&list), Ordering::Release); |
| 80 | |
| 81 | // This will never panic in practice, see https://lists.freedesktop.org/archives/dbus/2019-July/017786.html |
| 82 | for (w, _) in list.values() { |
| 83 | if let Some(ref fd) = &wm.current_fd { |
| 84 | assert_eq!(*fd, w.fd); |
| 85 | } else { |
| 86 | wm.current_fd = Some(w.fd); |
| 87 | } |
| 88 | } |
| 89 | } |
| 90 | |
| 91 | wm |
| 92 | } |
| 93 | } |
| 94 | |
| 95 | impl Drop for WatchMap { |
| 96 | fn drop(&mut self) { |
| 97 | let wptr: &WatchMap = &self; |
| 98 | if unsafe { ffi::dbus_connection_set_watch_functions(self.conn.0, |
| 99 | add_function:None, remove_function:None, toggled_function:None, data:wptr as *const _ as *mut _, free_data_function:None) } == 0 { |
| 100 | panic!("Cannot disable watch tracking (OOM?)" ) |
| 101 | } |
| 102 | } |
| 103 | } |
| 104 | |
| 105 | /// Low-level connection - handles read/write to the socket |
| 106 | /// |
| 107 | /// You probably do not need to worry about this as you would typically |
| 108 | /// use the various blocking and non-blocking "Connection" structs instead. |
| 109 | /// |
| 110 | /// This version avoids dbus_connection_dispatch, and thus avoids |
| 111 | /// callbacks from that function. Instead the same functionality |
| 112 | /// is implemented in the various blocking and non-blocking "Connection" components. |
| 113 | /// |
| 114 | /// Blocking operations are clearly marked as such, although if you |
| 115 | /// try to access the connection from several threads at the same time, |
| 116 | /// blocking might occur due to an internal mutex inside the dbus library. |
| 117 | #[derive (Debug)] |
| 118 | pub struct Channel { |
| 119 | handle: ConnHandle, |
| 120 | watchmap: Option<Box<WatchMap>>, |
| 121 | } |
| 122 | |
| 123 | impl Drop for Channel { |
| 124 | fn drop(&mut self) { |
| 125 | self.set_watch_enabled(enable:false); // Make sure "watchmap" is destroyed before "handle" is |
| 126 | } |
| 127 | } |
| 128 | |
| 129 | impl Channel { |
| 130 | #[inline (always)] |
| 131 | pub (crate) fn conn(&self) -> *mut ffi::DBusConnection { |
| 132 | self.handle.0 |
| 133 | } |
| 134 | |
| 135 | fn conn_from_ptr(ptr: *mut ffi::DBusConnection) -> Result<Channel, Error> { |
| 136 | let handle = ConnHandle(ptr, true); |
| 137 | |
| 138 | /* No, we don't want our app to suddenly quit if dbus goes down */ |
| 139 | unsafe { ffi::dbus_connection_set_exit_on_disconnect(ptr, 0) }; |
| 140 | |
| 141 | let c = Channel { handle, watchmap: None }; |
| 142 | |
| 143 | Ok(c) |
| 144 | } |
| 145 | |
| 146 | |
| 147 | /// Creates a new D-Bus connection. |
| 148 | /// |
| 149 | /// Blocking: until the connection is up and running. |
| 150 | pub fn get_private(bus: BusType) -> Result<Channel, Error> { |
| 151 | let mut e = Error::empty(); |
| 152 | let b = match bus { |
| 153 | BusType::Session => ffi::DBusBusType::Session, |
| 154 | BusType::System => ffi::DBusBusType::System, |
| 155 | BusType::Starter => ffi::DBusBusType::Starter, |
| 156 | }; |
| 157 | let conn = unsafe { ffi::dbus_bus_get_private(b, e.get_mut()) }; |
| 158 | if conn.is_null() { |
| 159 | return Err(e) |
| 160 | } |
| 161 | Self::conn_from_ptr(conn) |
| 162 | } |
| 163 | |
| 164 | /// Creates a new D-Bus connection to a remote address. |
| 165 | /// |
| 166 | /// Note: for all common cases (System / Session bus) you probably want "get_private" instead. |
| 167 | /// |
| 168 | /// Blocking: until the connection is established. |
| 169 | pub fn open_private(address: &str) -> Result<Channel, Error> { |
| 170 | let mut e = Error::empty(); |
| 171 | let conn = unsafe { ffi::dbus_connection_open_private(to_c_str(address).as_ptr(), e.get_mut()) }; |
| 172 | if conn.is_null() { |
| 173 | return Err(e) |
| 174 | } |
| 175 | Self::conn_from_ptr(conn) |
| 176 | } |
| 177 | |
| 178 | /// Registers a new D-Bus connection with the bus. |
| 179 | /// |
| 180 | /// Note: `get_private` does this automatically, useful with `open_private` |
| 181 | /// |
| 182 | /// Blocking: until a "Hello" response is received from the server. |
| 183 | pub fn register(&mut self) -> Result<(), Error> { |
| 184 | // This function needs to take &mut self, because it changes unique_name and unique_name takes a &self |
| 185 | let mut e = Error::empty(); |
| 186 | if unsafe { ffi::dbus_bus_register(self.conn(), e.get_mut()) == 0 } { |
| 187 | Err(e) |
| 188 | } else { |
| 189 | Ok(()) |
| 190 | } |
| 191 | } |
| 192 | |
| 193 | /// Gets whether the connection is currently open. |
| 194 | pub fn is_connected(&self) -> bool { |
| 195 | unsafe { ffi::dbus_connection_get_is_connected(self.conn()) != 0 } |
| 196 | } |
| 197 | |
| 198 | /// Get the connection's unique name. |
| 199 | /// |
| 200 | /// It's usually something like ":1.54" |
| 201 | pub fn unique_name(&self) -> Option<&str> { |
| 202 | let c = unsafe { ffi::dbus_bus_get_unique_name(self.conn()) }; |
| 203 | if c.is_null() { return None; } |
| 204 | let s = unsafe { CStr::from_ptr(c) }; |
| 205 | str::from_utf8(s.to_bytes()).ok() |
| 206 | } |
| 207 | |
| 208 | |
| 209 | /// Puts a message into libdbus out queue, and tries to send it. |
| 210 | /// |
| 211 | /// Returns a serial number than can be used to match against a reply. |
| 212 | /// |
| 213 | /// Note: usually the message is sent when this call happens, but in |
| 214 | /// case internal D-Bus buffers are full, it will be left in the out queue. |
| 215 | /// Call "flush" or "read_write" to retry flushing the out queue. |
| 216 | pub fn send(&self, msg: Message) -> Result<u32, ()> { |
| 217 | let mut serial = 0u32; |
| 218 | let r = unsafe { ffi::dbus_connection_send(self.conn(), msg.ptr(), &mut serial) }; |
| 219 | if r == 0 { return Err(()); } |
| 220 | Ok(serial) |
| 221 | } |
| 222 | |
| 223 | /// Sends a message over the D-Bus and waits for a reply. This is used for method calls. |
| 224 | /// |
| 225 | /// Blocking: until a reply is received or the timeout expires. |
| 226 | /// |
| 227 | /// Note: In case of an error reply, this is returned as an Err(), not as a Ok(Message) with the error type. |
| 228 | /// |
| 229 | /// Note: In case pop_message and send_with_reply_and_block is called in parallel from different threads, |
| 230 | /// they might race to retreive the reply message from the internal queue. |
| 231 | pub fn send_with_reply_and_block(&self, msg: Message, timeout: Duration) -> Result<Message, Error> { |
| 232 | let mut e = Error::empty(); |
| 233 | let response = unsafe { |
| 234 | ffi::dbus_connection_send_with_reply_and_block(self.conn(), msg.ptr(), |
| 235 | timeout.as_millis() as c_int, e.get_mut()) |
| 236 | }; |
| 237 | if response.is_null() { |
| 238 | return Err(e); |
| 239 | } |
| 240 | Ok(Message::from_ptr(response, false)) |
| 241 | } |
| 242 | |
| 243 | /// Flush the queue of outgoing messages. |
| 244 | /// |
| 245 | /// Blocking: until the outgoing queue is empty. |
| 246 | pub fn flush(&self) { unsafe { ffi::dbus_connection_flush(self.conn()) } } |
| 247 | |
| 248 | /// Read and write to the connection. |
| 249 | /// |
| 250 | /// Incoming messages are put in the internal queue, outgoing messages are written. |
| 251 | /// |
| 252 | /// Blocking: If there are no messages, for up to timeout, or forever if timeout is None. |
| 253 | /// For non-blocking behaviour, set timeout to Some(0). |
| 254 | pub fn read_write(&self, timeout: Option<Duration>) -> Result<(), ()> { |
| 255 | let t = timeout.map_or(-1, |t| t.as_millis() as c_int); |
| 256 | if unsafe { ffi::dbus_connection_read_write(self.conn(), t) == 0 } { |
| 257 | Err(()) |
| 258 | } else { |
| 259 | Ok(()) |
| 260 | } |
| 261 | } |
| 262 | |
| 263 | /// Gets whether the output message buffer is non-empty |
| 264 | pub fn has_messages_to_send(&self) -> bool { |
| 265 | unsafe { ffi::dbus_connection_has_messages_to_send(self.conn()) == 1 } |
| 266 | } |
| 267 | |
| 268 | /// Removes a message from the incoming queue, or returns None if the queue is empty. |
| 269 | /// |
| 270 | /// Use "read_write" first, so that messages are put into the incoming queue. |
| 271 | /// For unhandled messages, please call MessageDispatcher::default_dispatch to return |
| 272 | /// default replies for method calls. |
| 273 | pub fn pop_message(&self) -> Option<Message> { |
| 274 | let mptr = unsafe { ffi::dbus_connection_pop_message(self.conn()) }; |
| 275 | if mptr.is_null() { |
| 276 | None |
| 277 | } else { |
| 278 | let msg = Message::from_ptr(mptr, false); |
| 279 | // println!("Incoming: {:?}", msg); |
| 280 | Some(msg) |
| 281 | } |
| 282 | } |
| 283 | |
| 284 | /// Removes a message from the incoming queue, or waits until timeout if the queue is empty. |
| 285 | /// |
| 286 | pub fn blocking_pop_message(&self, timeout: Duration) -> Result<Option<Message>, Error> { |
| 287 | if let Some(msg) = self.pop_message() { return Ok(Some(msg)) } |
| 288 | self.read_write(Some(timeout)).map_err(|_| |
| 289 | Error::new_failed("Failed to read/write data, disconnected from D-Bus?" ) |
| 290 | )?; |
| 291 | Ok(self.pop_message()) |
| 292 | } |
| 293 | |
| 294 | /// Enables watch tracking, a prequisite for calling watch. |
| 295 | /// |
| 296 | /// (In theory, this could panic in case libdbus ever changes to listen to |
| 297 | /// something else than one file descriptor, |
| 298 | /// but this should be extremely unlikely to ever happen.) |
| 299 | pub fn set_watch_enabled(&mut self, enable: bool) { |
| 300 | if enable == self.watchmap.is_some() { return } |
| 301 | if enable { |
| 302 | self.watchmap = Some(WatchMap::new(ConnHandle(self.conn(), false))); |
| 303 | } else { |
| 304 | self.watchmap = None; |
| 305 | } |
| 306 | } |
| 307 | |
| 308 | /// Gets the file descriptor to listen for read/write. |
| 309 | /// |
| 310 | /// Panics: if set_watch_enabled is false. |
| 311 | /// |
| 312 | /// (In theory, this could panic in case libdbus ever changes to listen to |
| 313 | /// something else than one file descriptor, |
| 314 | /// but this should be extremely unlikely to ever happen.) |
| 315 | pub fn watch(&self) -> Watch { |
| 316 | let wm = self.watchmap.as_ref().unwrap(); |
| 317 | let rw = wm.current_rw.load(Ordering::Acquire); |
| 318 | Watch { |
| 319 | fd: wm.current_fd.unwrap(), |
| 320 | read: (rw & 1) != 0, |
| 321 | write: (rw & 2) != 0, |
| 322 | } |
| 323 | } |
| 324 | |
| 325 | /// Get an up-to-date list of file descriptors to watch. |
| 326 | /// |
| 327 | /// Obsolete - in practice, you can use watch and set_watch_enabled instead. |
| 328 | #[deprecated ] |
| 329 | pub fn watch_fds(&mut self) -> Result<Vec<Watch>, ()> { |
| 330 | let en = self.watchmap.is_some(); |
| 331 | self.set_watch_enabled(true); |
| 332 | let mut wlist: Vec<Watch> = self.watchmap.as_ref().unwrap().list.lock().unwrap().values() |
| 333 | .map(|&(w, b)| Watch { fd: w.fd, read: b && w.read, write: b && w.write }) |
| 334 | .collect(); |
| 335 | self.set_watch_enabled(en); |
| 336 | |
| 337 | if wlist.len() == 2 && wlist[0].fd == wlist[1].fd { |
| 338 | // This is always true in practice, see https://lists.freedesktop.org/archives/dbus/2019-July/017786.html |
| 339 | wlist = vec!(Watch { |
| 340 | fd: wlist[0].fd, |
| 341 | read: wlist[0].read || wlist[1].read, |
| 342 | write: wlist[0].write || wlist[1].write |
| 343 | }); |
| 344 | } |
| 345 | |
| 346 | Ok(wlist) |
| 347 | } |
| 348 | } |
| 349 | |
| 350 | |
| 351 | impl Watch { |
| 352 | unsafe fn from_raw_enabled(watch: *mut ffi::DBusWatch) -> (Self, bool) { |
| 353 | #[cfg (unix)] |
| 354 | let mut w: Watch = Watch {fd: ffi::dbus_watch_get_unix_fd(watch), read: false, write: false}; |
| 355 | #[cfg (windows)] |
| 356 | let mut w = Watch {fd: ffi::dbus_watch_get_socket(watch) as WatchFd, read: false, write: false}; |
| 357 | let enabled: bool = ffi::dbus_watch_get_enabled(watch) != 0; |
| 358 | let flags: u32 = ffi::dbus_watch_get_flags(watch); |
| 359 | use std::os::raw::c_uint; |
| 360 | w.read = (flags & ffi::DBUS_WATCH_READABLE as c_uint) != 0; |
| 361 | w.write = (flags & ffi::DBUS_WATCH_WRITABLE as c_uint) != 0; |
| 362 | (w, enabled) |
| 363 | } |
| 364 | } |
| 365 | |