1//! Contains some helper structs and traits common to all Connection types.-
2
3use crate::{Error, Message, to_c_str};
4use std::{str, time::Duration, collections::HashMap};
5use std::sync::{Mutex, atomic::AtomicU8, atomic::Ordering};
6use std::ffi::CStr;
7use std::os::raw::{c_void, c_int};
8use super::{BusType, Watch, WatchFd};
9
10#[derive(Debug)]
11struct ConnHandle(*mut ffi::DBusConnection, bool);
12
13unsafe impl Send for ConnHandle {}
14unsafe impl Sync for ConnHandle {}
15
16impl Drop for ConnHandle {
17 fn drop(&mut self) {
18 if self.1 { unsafe {
19 ffi::dbus_connection_close(self.0);
20 ffi::dbus_connection_unref(self.0);
21 }}
22 }
23}
24
25#[derive(Debug, Eq, PartialEq, Hash)]
26struct WatchHandle(*mut ffi::DBusWatch);
27
28unsafe impl Send for WatchHandle {}
29unsafe impl Sync for WatchHandle {}
30
31/// This struct must be boxed as it is called from D-Bus callbacks!
32#[derive(Debug)]
33struct WatchMap {
34 conn: ConnHandle,
35 list: Mutex<HashMap<WatchHandle, (Watch, bool)>>,
36 current_rw: AtomicU8,
37 current_fd: Option<WatchFd>,
38}
39
40fn calc_rw(list: &HashMap<WatchHandle, (Watch, bool)>) -> u8 {
41 let mut r: u8 = 0;
42 for (w: &Watch, b: &bool) in list.values() {
43 if *b && w.read { r |= 1; }
44 if *b && w.write { r |= 2; }
45 }
46 r
47}
48
49impl WatchMap {
50 fn new(conn: ConnHandle) -> Box<WatchMap> {
51 extern "C" fn add_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) -> u32 { unsafe {
52 let wm: &WatchMap = &*(data as *mut _);
53 wm.list.lock().unwrap().insert(WatchHandle(watch), Watch::from_raw_enabled(watch));
54 1
55 }}
56 extern "C" fn remove_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) { unsafe {
57 let wm: &WatchMap = &*(data as *mut _);
58 wm.list.lock().unwrap().remove(&WatchHandle(watch));
59 }}
60 extern "C" fn toggled_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) { unsafe {
61 let wm: &WatchMap = &*(data as *mut _);
62 let mut list = wm.list.lock().unwrap();
63 let (_, ref mut b) = list.get_mut(&WatchHandle(watch)).unwrap();
64 *b = ffi::dbus_watch_get_enabled(watch) != 0;
65 wm.current_rw.store(calc_rw(&list), Ordering::Release);
66 }}
67
68 let mut wm = Box::new(WatchMap {
69 conn, list: Default::default(), current_rw: Default::default(), current_fd: None
70 });
71 let wptr: &WatchMap = &wm;
72 if unsafe { ffi::dbus_connection_set_watch_functions(wm.conn.0,
73 Some(add_watch_cb), Some(remove_watch_cb), Some(toggled_watch_cb), wptr as *const _ as *mut _, None) } == 0 {
74 panic!("Cannot enable watch tracking (OOM?)")
75 }
76
77 {
78 let list = wm.list.lock().unwrap();
79 wm.current_rw.store(calc_rw(&list), Ordering::Release);
80
81 // This will never panic in practice, see https://lists.freedesktop.org/archives/dbus/2019-July/017786.html
82 for (w, _) in list.values() {
83 if let Some(ref fd) = &wm.current_fd {
84 assert_eq!(*fd, w.fd);
85 } else {
86 wm.current_fd = Some(w.fd);
87 }
88 }
89 }
90
91 wm
92 }
93}
94
95impl Drop for WatchMap {
96 fn drop(&mut self) {
97 let wptr: &WatchMap = &self;
98 if unsafe { ffi::dbus_connection_set_watch_functions(self.conn.0,
99 add_function:None, remove_function:None, toggled_function:None, data:wptr as *const _ as *mut _, free_data_function:None) } == 0 {
100 panic!("Cannot disable watch tracking (OOM?)")
101 }
102 }
103}
104
105/// Low-level connection - handles read/write to the socket
106///
107/// You probably do not need to worry about this as you would typically
108/// use the various blocking and non-blocking "Connection" structs instead.
109///
110/// This version avoids dbus_connection_dispatch, and thus avoids
111/// callbacks from that function. Instead the same functionality
112/// is implemented in the various blocking and non-blocking "Connection" components.
113///
114/// Blocking operations are clearly marked as such, although if you
115/// try to access the connection from several threads at the same time,
116/// blocking might occur due to an internal mutex inside the dbus library.
117#[derive(Debug)]
118pub struct Channel {
119 handle: ConnHandle,
120 watchmap: Option<Box<WatchMap>>,
121}
122
123impl Drop for Channel {
124 fn drop(&mut self) {
125 self.set_watch_enabled(enable:false); // Make sure "watchmap" is destroyed before "handle" is
126 }
127}
128
129impl Channel {
130 #[inline(always)]
131 pub (crate) fn conn(&self) -> *mut ffi::DBusConnection {
132 self.handle.0
133 }
134
135 fn conn_from_ptr(ptr: *mut ffi::DBusConnection) -> Result<Channel, Error> {
136 let handle = ConnHandle(ptr, true);
137
138 /* No, we don't want our app to suddenly quit if dbus goes down */
139 unsafe { ffi::dbus_connection_set_exit_on_disconnect(ptr, 0) };
140
141 let c = Channel { handle, watchmap: None };
142
143 Ok(c)
144 }
145
146
147 /// Creates a new D-Bus connection.
148 ///
149 /// Blocking: until the connection is up and running.
150 pub fn get_private(bus: BusType) -> Result<Channel, Error> {
151 let mut e = Error::empty();
152 let b = match bus {
153 BusType::Session => ffi::DBusBusType::Session,
154 BusType::System => ffi::DBusBusType::System,
155 BusType::Starter => ffi::DBusBusType::Starter,
156 };
157 let conn = unsafe { ffi::dbus_bus_get_private(b, e.get_mut()) };
158 if conn.is_null() {
159 return Err(e)
160 }
161 Self::conn_from_ptr(conn)
162 }
163
164 /// Creates a new D-Bus connection to a remote address.
165 ///
166 /// Note: for all common cases (System / Session bus) you probably want "get_private" instead.
167 ///
168 /// Blocking: until the connection is established.
169 pub fn open_private(address: &str) -> Result<Channel, Error> {
170 let mut e = Error::empty();
171 let conn = unsafe { ffi::dbus_connection_open_private(to_c_str(address).as_ptr(), e.get_mut()) };
172 if conn.is_null() {
173 return Err(e)
174 }
175 Self::conn_from_ptr(conn)
176 }
177
178 /// Registers a new D-Bus connection with the bus.
179 ///
180 /// Note: `get_private` does this automatically, useful with `open_private`
181 ///
182 /// Blocking: until a "Hello" response is received from the server.
183 pub fn register(&mut self) -> Result<(), Error> {
184 // This function needs to take &mut self, because it changes unique_name and unique_name takes a &self
185 let mut e = Error::empty();
186 if unsafe { ffi::dbus_bus_register(self.conn(), e.get_mut()) == 0 } {
187 Err(e)
188 } else {
189 Ok(())
190 }
191 }
192
193 /// Gets whether the connection is currently open.
194 pub fn is_connected(&self) -> bool {
195 unsafe { ffi::dbus_connection_get_is_connected(self.conn()) != 0 }
196 }
197
198 /// Get the connection's unique name.
199 ///
200 /// It's usually something like ":1.54"
201 pub fn unique_name(&self) -> Option<&str> {
202 let c = unsafe { ffi::dbus_bus_get_unique_name(self.conn()) };
203 if c.is_null() { return None; }
204 let s = unsafe { CStr::from_ptr(c) };
205 str::from_utf8(s.to_bytes()).ok()
206 }
207
208
209 /// Puts a message into libdbus out queue, and tries to send it.
210 ///
211 /// Returns a serial number than can be used to match against a reply.
212 ///
213 /// Note: usually the message is sent when this call happens, but in
214 /// case internal D-Bus buffers are full, it will be left in the out queue.
215 /// Call "flush" or "read_write" to retry flushing the out queue.
216 pub fn send(&self, msg: Message) -> Result<u32, ()> {
217 let mut serial = 0u32;
218 let r = unsafe { ffi::dbus_connection_send(self.conn(), msg.ptr(), &mut serial) };
219 if r == 0 { return Err(()); }
220 Ok(serial)
221 }
222
223 /// Sends a message over the D-Bus and waits for a reply. This is used for method calls.
224 ///
225 /// Blocking: until a reply is received or the timeout expires.
226 ///
227 /// Note: In case of an error reply, this is returned as an Err(), not as a Ok(Message) with the error type.
228 ///
229 /// Note: In case pop_message and send_with_reply_and_block is called in parallel from different threads,
230 /// they might race to retreive the reply message from the internal queue.
231 pub fn send_with_reply_and_block(&self, msg: Message, timeout: Duration) -> Result<Message, Error> {
232 let mut e = Error::empty();
233 let response = unsafe {
234 ffi::dbus_connection_send_with_reply_and_block(self.conn(), msg.ptr(),
235 timeout.as_millis() as c_int, e.get_mut())
236 };
237 if response.is_null() {
238 return Err(e);
239 }
240 Ok(Message::from_ptr(response, false))
241 }
242
243 /// Flush the queue of outgoing messages.
244 ///
245 /// Blocking: until the outgoing queue is empty.
246 pub fn flush(&self) { unsafe { ffi::dbus_connection_flush(self.conn()) } }
247
248 /// Read and write to the connection.
249 ///
250 /// Incoming messages are put in the internal queue, outgoing messages are written.
251 ///
252 /// Blocking: If there are no messages, for up to timeout, or forever if timeout is None.
253 /// For non-blocking behaviour, set timeout to Some(0).
254 pub fn read_write(&self, timeout: Option<Duration>) -> Result<(), ()> {
255 let t = timeout.map_or(-1, |t| t.as_millis() as c_int);
256 if unsafe { ffi::dbus_connection_read_write(self.conn(), t) == 0 } {
257 Err(())
258 } else {
259 Ok(())
260 }
261 }
262
263 /// Gets whether the output message buffer is non-empty
264 pub fn has_messages_to_send(&self) -> bool {
265 unsafe { ffi::dbus_connection_has_messages_to_send(self.conn()) == 1 }
266 }
267
268 /// Removes a message from the incoming queue, or returns None if the queue is empty.
269 ///
270 /// Use "read_write" first, so that messages are put into the incoming queue.
271 /// For unhandled messages, please call MessageDispatcher::default_dispatch to return
272 /// default replies for method calls.
273 pub fn pop_message(&self) -> Option<Message> {
274 let mptr = unsafe { ffi::dbus_connection_pop_message(self.conn()) };
275 if mptr.is_null() {
276 None
277 } else {
278 let msg = Message::from_ptr(mptr, false);
279 // println!("Incoming: {:?}", msg);
280 Some(msg)
281 }
282 }
283
284 /// Removes a message from the incoming queue, or waits until timeout if the queue is empty.
285 ///
286 pub fn blocking_pop_message(&self, timeout: Duration) -> Result<Option<Message>, Error> {
287 if let Some(msg) = self.pop_message() { return Ok(Some(msg)) }
288 self.read_write(Some(timeout)).map_err(|_|
289 Error::new_failed("Failed to read/write data, disconnected from D-Bus?")
290 )?;
291 Ok(self.pop_message())
292 }
293
294 /// Enables watch tracking, a prequisite for calling watch.
295 ///
296 /// (In theory, this could panic in case libdbus ever changes to listen to
297 /// something else than one file descriptor,
298 /// but this should be extremely unlikely to ever happen.)
299 pub fn set_watch_enabled(&mut self, enable: bool) {
300 if enable == self.watchmap.is_some() { return }
301 if enable {
302 self.watchmap = Some(WatchMap::new(ConnHandle(self.conn(), false)));
303 } else {
304 self.watchmap = None;
305 }
306 }
307
308 /// Gets the file descriptor to listen for read/write.
309 ///
310 /// Panics: if set_watch_enabled is false.
311 ///
312 /// (In theory, this could panic in case libdbus ever changes to listen to
313 /// something else than one file descriptor,
314 /// but this should be extremely unlikely to ever happen.)
315 pub fn watch(&self) -> Watch {
316 let wm = self.watchmap.as_ref().unwrap();
317 let rw = wm.current_rw.load(Ordering::Acquire);
318 Watch {
319 fd: wm.current_fd.unwrap(),
320 read: (rw & 1) != 0,
321 write: (rw & 2) != 0,
322 }
323 }
324
325 /// Get an up-to-date list of file descriptors to watch.
326 ///
327 /// Obsolete - in practice, you can use watch and set_watch_enabled instead.
328 #[deprecated]
329 pub fn watch_fds(&mut self) -> Result<Vec<Watch>, ()> {
330 let en = self.watchmap.is_some();
331 self.set_watch_enabled(true);
332 let mut wlist: Vec<Watch> = self.watchmap.as_ref().unwrap().list.lock().unwrap().values()
333 .map(|&(w, b)| Watch { fd: w.fd, read: b && w.read, write: b && w.write })
334 .collect();
335 self.set_watch_enabled(en);
336
337 if wlist.len() == 2 && wlist[0].fd == wlist[1].fd {
338 // This is always true in practice, see https://lists.freedesktop.org/archives/dbus/2019-July/017786.html
339 wlist = vec!(Watch {
340 fd: wlist[0].fd,
341 read: wlist[0].read || wlist[1].read,
342 write: wlist[0].write || wlist[1].write
343 });
344 }
345
346 Ok(wlist)
347 }
348}
349
350
351impl Watch {
352 unsafe fn from_raw_enabled(watch: *mut ffi::DBusWatch) -> (Self, bool) {
353 #[cfg(unix)]
354 let mut w: Watch = Watch {fd: ffi::dbus_watch_get_unix_fd(watch), read: false, write: false};
355 #[cfg(windows)]
356 let mut w = Watch {fd: ffi::dbus_watch_get_socket(watch) as WatchFd, read: false, write: false};
357 let enabled: bool = ffi::dbus_watch_get_enabled(watch) != 0;
358 let flags: u32 = ffi::dbus_watch_get_flags(watch);
359 use std::os::raw::c_uint;
360 w.read = (flags & ffi::DBUS_WATCH_READABLE as c_uint) != 0;
361 w.write = (flags & ffi::DBUS_WATCH_WRITABLE as c_uint) != 0;
362 (w, enabled)
363 }
364}
365