1 | //! Contains structs and traits relevant to the connection itself, and dispatching incoming messages. |
2 | |
3 | use crate::{Error, Message, MessageType, c_str_to_slice, channel::WatchFd, ffi, to_c_str}; |
4 | use crate::ffidisp::ConnPath; |
5 | use std::{fmt, mem, ptr, thread, panic, ops}; |
6 | use std::{collections::VecDeque, time::Duration}; |
7 | use std::cell::{Cell, RefCell}; |
8 | use std::os::raw::{c_void, c_char, c_int, c_uint}; |
9 | use crate::strings::{BusName, Path}; |
10 | use super::{Watch, WatchList, MessageCallback, ConnectionItem, MsgHandler, MsgHandlerList, MessageReply, BusType}; |
11 | |
12 | |
13 | /* Since we register callbacks with userdata pointers, |
14 | we need to make sure the connection pointer does not move around. |
15 | Hence this extra indirection. */ |
16 | struct IConnection { |
17 | conn: Cell<*mut ffi::DBusConnection>, |
18 | pending_items: RefCell<VecDeque<Message>>, |
19 | watches: Option<Box<WatchList>>, |
20 | handlers: RefCell<super::MsgHandlerList>, |
21 | |
22 | filter_cb: RefCell<Option<MessageCallback>>, |
23 | filter_cb_panic: RefCell<thread::Result<()>>, |
24 | } |
25 | |
26 | /// A D-Bus connection. Start here if you want to get on the D-Bus! |
27 | pub struct Connection { |
28 | i: Box<IConnection>, |
29 | } |
30 | |
31 | pub (crate) fn conn_handle(c: &Connection) -> *mut ffi::DBusConnection { |
32 | c.i.conn.get() |
33 | } |
34 | |
35 | extern "C" fn filter_message_cb(conn: *mut ffi::DBusConnection, msg: *mut ffi::DBusMessage, |
36 | user_data: *mut c_void) -> ffi::DBusHandlerResult { |
37 | |
38 | let i: &IConnection = unsafe { mem::transmute(user_data) }; |
39 | let connref: panic::AssertUnwindSafe<&Connection> = unsafe { mem::transmute(&i) }; |
40 | if i.conn.get() != conn || i.filter_cb_panic.try_borrow().is_err() { |
41 | // This should never happen, but let's be extra sure |
42 | // process::abort(); ?? |
43 | return ffi::DBusHandlerResult::Handled; |
44 | } |
45 | if i.filter_cb_panic.borrow().is_err() { |
46 | // We're in panic mode. Let's quit this ASAP |
47 | return ffi::DBusHandlerResult::Handled; |
48 | } |
49 | |
50 | let fcb = panic::AssertUnwindSafe(&i.filter_cb); |
51 | let r = panic::catch_unwind(|| { |
52 | let m = Message::from_ptr(msg, true); |
53 | let mut cb = fcb.borrow_mut().take().unwrap(); // Take the callback out while we call it. |
54 | let r = cb(connref.0, m); |
55 | let mut cb2 = fcb.borrow_mut(); // If the filter callback has not been replaced, put it back in. |
56 | if cb2.is_none() { *cb2 = Some(cb) }; |
57 | r |
58 | }); |
59 | |
60 | match r { |
61 | Ok(false) => ffi::DBusHandlerResult::NotYetHandled, |
62 | Ok(true) => ffi::DBusHandlerResult::Handled, |
63 | Err(e) => { |
64 | *i.filter_cb_panic.borrow_mut() = Err(e); |
65 | ffi::DBusHandlerResult::Handled |
66 | } |
67 | } |
68 | } |
69 | |
70 | fn default_filter_callback(c: &Connection, m: Message) -> bool { |
71 | let b: bool = m.msg_type() == MessageType::Signal; |
72 | c.i.pending_items.borrow_mut().push_back(m); |
73 | b |
74 | } |
75 | |
76 | extern "C" fn object_path_message_cb(_conn: *mut ffi::DBusConnection, _msg: *mut ffi::DBusMessage, |
77 | _user_data: *mut c_void) -> ffi::DBusHandlerResult { |
78 | /* Already pushed in filter_message_cb, so we just set the handled flag here to disable the |
79 | "default" handler. */ |
80 | ffi::DBusHandlerResult::Handled |
81 | } |
82 | |
83 | impl Connection { |
84 | #[inline (always)] |
85 | fn conn(&self) -> *mut ffi::DBusConnection { |
86 | self.i.conn.get() |
87 | } |
88 | |
89 | fn conn_from_ptr(conn: *mut ffi::DBusConnection) -> Result<Connection, Error> { |
90 | let mut c = Connection { i: Box::new(IConnection { |
91 | conn: Cell::new(conn), |
92 | pending_items: RefCell::new(VecDeque::new()), |
93 | watches: None, |
94 | handlers: RefCell::new(vec!()), |
95 | filter_cb: RefCell::new(Some(Box::new(default_filter_callback))), |
96 | filter_cb_panic: RefCell::new(Ok(())), |
97 | })}; |
98 | |
99 | /* No, we don't want our app to suddenly quit if dbus goes down */ |
100 | unsafe { ffi::dbus_connection_set_exit_on_disconnect(conn, 0) }; |
101 | assert!(unsafe { |
102 | ffi::dbus_connection_add_filter(c.conn(), Some(filter_message_cb), mem::transmute(&*c.i), None) |
103 | } != 0); |
104 | |
105 | c.i.watches = Some(WatchList::new(&c, Box::new(|_| {}))); |
106 | Ok(c) |
107 | } |
108 | |
109 | /// Creates a new connection to the session bus. |
110 | /// |
111 | /// Just a shortcut for `get_private(BusType::Session)`. |
112 | pub fn new_session() -> Result<Connection, Error> { Self::get_private(BusType::Session) } |
113 | |
114 | /// Creates a new connection to the system bus. |
115 | /// |
116 | /// Just a shortcut for `get_private(BusType::System)`. |
117 | pub fn new_system() -> Result<Connection, Error> { Self::get_private(BusType::System) } |
118 | |
119 | /// Creates a new D-Bus connection. |
120 | pub fn get_private(bus: BusType) -> Result<Connection, Error> { |
121 | let mut e = Error::empty(); |
122 | let conn = unsafe { ffi::dbus_bus_get_private(bus, e.get_mut()) }; |
123 | if conn.is_null() { |
124 | return Err(e) |
125 | } |
126 | Self::conn_from_ptr(conn) |
127 | } |
128 | |
129 | /// Creates a new D-Bus connection to a remote address. |
130 | /// |
131 | /// Note: for all common cases (System / Session bus) you probably want "get_private" instead. |
132 | pub fn open_private(address: &str) -> Result<Connection, Error> { |
133 | let mut e = Error::empty(); |
134 | let conn = unsafe { ffi::dbus_connection_open_private(to_c_str(address).as_ptr(), e.get_mut()) }; |
135 | if conn.is_null() { |
136 | return Err(e) |
137 | } |
138 | Self::conn_from_ptr(conn) |
139 | } |
140 | |
141 | /// Registers a new D-Bus connection with the bus. |
142 | /// |
143 | /// Note: `get_private` does this automatically, useful with `open_private` |
144 | pub fn register(&self) -> Result<(), Error> { |
145 | let mut e = Error::empty(); |
146 | if unsafe { ffi::dbus_bus_register(self.conn(), e.get_mut()) == 0 } { |
147 | Err(e) |
148 | } else { |
149 | Ok(()) |
150 | } |
151 | } |
152 | |
153 | /// Gets whether the connection is currently open. |
154 | pub fn is_connected(&self) -> bool { |
155 | unsafe { ffi::dbus_connection_get_is_connected(self.conn()) != 0 } |
156 | } |
157 | |
158 | /// Sends a message over the D-Bus and waits for a reply. |
159 | /// This is usually used for method calls. |
160 | pub fn send_with_reply_and_block(&self, msg: Message, timeout_ms: i32) -> Result<Message, Error> { |
161 | let mut e = Error::empty(); |
162 | let response = unsafe { |
163 | ffi::dbus_connection_send_with_reply_and_block(self.conn(), msg.ptr(), |
164 | timeout_ms as c_int, e.get_mut()) |
165 | }; |
166 | if response.is_null() { |
167 | return Err(e); |
168 | } |
169 | Ok(Message::from_ptr(response, false)) |
170 | } |
171 | |
172 | /// Sends a message over the D-Bus without waiting. Useful for sending signals and method call replies. |
173 | pub fn send(&self, msg: Message) -> Result<u32,()> { |
174 | let mut serial = 0u32; |
175 | let r = unsafe { ffi::dbus_connection_send(self.conn(), msg.ptr(), &mut serial) }; |
176 | if r == 0 { return Err(()); } |
177 | unsafe { ffi::dbus_connection_flush(self.conn()) }; |
178 | Ok(serial) |
179 | } |
180 | |
181 | /// Sends a message over the D-Bus, returning a MessageReply. |
182 | /// |
183 | /// Call add_handler on the result to start waiting for reply. This should be done before next call to `incoming` or `iter`. |
184 | pub fn send_with_reply<'a, F: FnOnce(Result<&Message, Error>) + 'a>(&self, msg: Message, f: F) -> Result<MessageReply<F>, ()> { |
185 | let serial = self.send(msg)?; |
186 | Ok(MessageReply(Some(f), serial)) |
187 | } |
188 | |
189 | /// Adds a message handler to the connection. |
190 | /// |
191 | /// # Example |
192 | /// |
193 | /// ``` |
194 | /// use std::{cell, rc}; |
195 | /// use dbus::{ffidisp::Connection, Message}; |
196 | /// |
197 | /// let c = Connection::new_session().unwrap(); |
198 | /// let m = Message::new_method_call("org.freedesktop.DBus" , "/" , "org.freedesktop.DBus" , "ListNames" ).unwrap(); |
199 | /// |
200 | /// let done: rc::Rc<cell::Cell<bool>> = Default::default(); |
201 | /// let done2 = done.clone(); |
202 | /// c.add_handler(c.send_with_reply(m, move |reply| { |
203 | /// let v: Vec<&str> = reply.unwrap().read1().unwrap(); |
204 | /// println!("The names on the D-Bus are: {:?}" , v); |
205 | /// done2.set(true); |
206 | /// }).unwrap()); |
207 | /// while !done.get() { c.incoming(100).next(); } |
208 | /// ``` |
209 | pub fn add_handler<H: MsgHandler + 'static>(&self, h: H) { |
210 | let h = Box::new(h); |
211 | self.i.handlers.borrow_mut().push(h); |
212 | } |
213 | |
214 | /// Removes a MsgHandler from the connection. |
215 | /// |
216 | /// If there are many MsgHandlers, it is not specified which one will be returned. |
217 | /// |
218 | /// There might be more methods added later on, which give better ways to deal |
219 | /// with the list of MsgHandler currently on the connection. If this would help you, |
220 | /// please [file an issue](https://github.com/diwic/dbus-rs/issues). |
221 | pub fn extract_handler(&self) -> Option<Box<dyn MsgHandler>> { |
222 | self.i.handlers.borrow_mut().pop() |
223 | } |
224 | |
225 | /// Get the connection's unique name. |
226 | pub fn unique_name(&self) -> String { |
227 | let c = unsafe { ffi::dbus_bus_get_unique_name(self.conn()) }; |
228 | c_str_to_slice(&c).unwrap_or("" ).to_string() |
229 | } |
230 | |
231 | /// Check if there are new incoming events |
232 | /// |
233 | /// If there are no incoming events, ConnectionItems::Nothing will be returned. |
234 | /// See ConnectionItems::new if you want to customize this behaviour. |
235 | pub fn iter(&self, timeout_ms: i32) -> ConnectionItems { |
236 | ConnectionItems::new(self, Some(timeout_ms), false) |
237 | } |
238 | |
239 | /// Check if there are new incoming events |
240 | /// |
241 | /// Supersedes "iter". |
242 | pub fn incoming(&self, timeout_ms: u32) -> ConnMsgs<&Self> { |
243 | ConnMsgs { conn: &self, timeout_ms: Some(timeout_ms) } |
244 | } |
245 | |
246 | /// Register an object path. |
247 | pub fn register_object_path(&self, path: &str) -> Result<(), Error> { |
248 | let mut e = Error::empty(); |
249 | let p = to_c_str(path); |
250 | let vtable = ffi::DBusObjectPathVTable { |
251 | unregister_function: None, |
252 | message_function: Some(object_path_message_cb), |
253 | dbus_internal_pad1: None, |
254 | dbus_internal_pad2: None, |
255 | dbus_internal_pad3: None, |
256 | dbus_internal_pad4: None, |
257 | }; |
258 | let r = unsafe { |
259 | let user_data: *mut c_void = mem::transmute(&*self.i); |
260 | ffi::dbus_connection_try_register_object_path(self.conn(), p.as_ptr(), &vtable, user_data, e.get_mut()) |
261 | }; |
262 | if r == 0 { Err(e) } else { Ok(()) } |
263 | } |
264 | |
265 | /// Unregister an object path. |
266 | pub fn unregister_object_path(&self, path: &str) { |
267 | let p = to_c_str(path); |
268 | let r = unsafe { ffi::dbus_connection_unregister_object_path(self.conn(), p.as_ptr()) }; |
269 | if r == 0 { panic!("Out of memory" ); } |
270 | } |
271 | |
272 | /// List registered object paths. |
273 | pub fn list_registered_object_paths(&self, path: &str) -> Vec<String> { |
274 | let p = to_c_str(path); |
275 | let mut clist: *mut *mut c_char = ptr::null_mut(); |
276 | let r = unsafe { ffi::dbus_connection_list_registered(self.conn(), p.as_ptr(), &mut clist) }; |
277 | if r == 0 { panic!("Out of memory" ); } |
278 | let mut v = Vec::new(); |
279 | let mut i = 0; |
280 | loop { |
281 | let s = unsafe { |
282 | let citer = clist.offset(i); |
283 | if *citer == ptr::null_mut() { break }; |
284 | mem::transmute(citer) |
285 | }; |
286 | v.push(format!(" {}" , c_str_to_slice(s).unwrap())); |
287 | i += 1; |
288 | } |
289 | unsafe { ffi::dbus_free_string_array(clist) }; |
290 | v |
291 | } |
292 | |
293 | /// Register a name. |
294 | pub fn register_name(&self, name: &str, flags: u32) -> Result<super::RequestNameReply, Error> { |
295 | let mut e = Error::empty(); |
296 | let n = to_c_str(name); |
297 | let r = unsafe { ffi::dbus_bus_request_name(self.conn(), n.as_ptr(), flags, e.get_mut()) }; |
298 | if r == -1 { Err(e) } else { Ok(unsafe { mem::transmute(r) }) } |
299 | } |
300 | |
301 | /// Release a name. |
302 | pub fn release_name(&self, name: &str) -> Result<super::ReleaseNameReply, Error> { |
303 | let mut e = Error::empty(); |
304 | let n = to_c_str(name); |
305 | let r = unsafe { ffi::dbus_bus_release_name(self.conn(), n.as_ptr(), e.get_mut()) }; |
306 | if r == -1 { Err(e) } else { Ok(unsafe { mem::transmute(r) }) } |
307 | } |
308 | |
309 | /// Add a match rule to match messages on the message bus. |
310 | /// |
311 | /// See the `unity_focused_window` example for how to use this to catch signals. |
312 | /// (The syntax of the "rule" string is specified in the [D-Bus specification](https://dbus.freedesktop.org/doc/dbus-specification.html#message-bus-routing-match-rules).) |
313 | pub fn add_match(&self, rule: &str) -> Result<(), Error> { |
314 | let mut e = Error::empty(); |
315 | let n = to_c_str(rule); |
316 | unsafe { ffi::dbus_bus_add_match(self.conn(), n.as_ptr(), e.get_mut()) }; |
317 | if e.name().is_some() { Err(e) } else { Ok(()) } |
318 | } |
319 | |
320 | /// Remove a match rule to match messages on the message bus. |
321 | pub fn remove_match(&self, rule: &str) -> Result<(), Error> { |
322 | let mut e = Error::empty(); |
323 | let n = to_c_str(rule); |
324 | unsafe { ffi::dbus_bus_remove_match(self.conn(), n.as_ptr(), e.get_mut()) }; |
325 | if e.name().is_some() { Err(e) } else { Ok(()) } |
326 | } |
327 | |
328 | /// Async I/O: Get an up-to-date list of file descriptors to watch. |
329 | /// |
330 | /// See the `Watch` struct for an example. |
331 | pub fn watch_fds(&self) -> Vec<Watch> { |
332 | self.i.watches.as_ref().unwrap().get_enabled_fds() |
333 | } |
334 | |
335 | /// Async I/O: Call this function whenever you detected an event on the Fd, |
336 | /// Flags are a set of WatchEvent bits. |
337 | /// The returned iterator will return pending items only, never block for new events. |
338 | /// |
339 | /// See the `Watch` struct for an example. |
340 | pub fn watch_handle(&self, fd: WatchFd, flags: c_uint) -> ConnectionItems { |
341 | self.i.watches.as_ref().unwrap().watch_handle(fd, flags); |
342 | ConnectionItems::new(self, None, true) |
343 | } |
344 | |
345 | /// Create a convenience struct for easier calling of many methods on the same destination and path. |
346 | pub fn with_path<'a, D: Into<BusName<'a>>, P: Into<Path<'a>>>(&'a self, dest: D, path: P, timeout_ms: i32) -> |
347 | ConnPath<'a, &'a Connection> { |
348 | ConnPath { conn: self, dest: dest.into(), path: path.into(), timeout: timeout_ms } |
349 | } |
350 | |
351 | /// Replace the default message callback. Returns the previously set callback. |
352 | /// |
353 | /// By default, when you call ConnectionItems::next, all relevant incoming messages |
354 | /// are returned through the ConnectionItems iterator, and |
355 | /// irrelevant messages are passed on to libdbus's default handler. |
356 | /// If you need to customize this behaviour (i e, to handle all incoming messages yourself), |
357 | /// you can set this message callback yourself. A few caveats apply: |
358 | /// |
359 | /// Return true from the callback to disable libdbus's internal handling of the message, or |
360 | /// false to allow it. In other words, true and false correspond to |
361 | /// `DBUS_HANDLER_RESULT_HANDLED` and `DBUS_HANDLER_RESULT_NOT_YET_HANDLED` respectively. |
362 | /// |
363 | /// Be sure to call the previously set callback from inside your callback, |
364 | /// if you want, e.g. ConnectionItems::next to yield the message. |
365 | /// |
366 | /// You can unset the message callback (might be useful to satisfy the borrow checker), but |
367 | /// you will get a panic if you call ConnectionItems::next while the message callback is unset. |
368 | /// The message callback will be temporary unset while inside a message callback, so calling |
369 | /// ConnectionItems::next recursively will also result in a panic. |
370 | /// |
371 | /// If your message callback panics, ConnectionItems::next will panic, too. |
372 | /// |
373 | /// # Examples |
374 | /// |
375 | /// Replace the default callback with our own: |
376 | /// |
377 | /// ```ignore |
378 | /// use dbus::ffidisp::Connection; |
379 | /// let c = Connection::new_session().unwrap(); |
380 | /// // Set our callback |
381 | /// c.replace_message_callback(Some(Box::new(move |conn, msg| { |
382 | /// println!("Got message: {:?}" , msg.get_items()); |
383 | /// // Let libdbus handle some things by default, |
384 | /// // like "nonexistent object" error replies to method calls |
385 | /// false |
386 | /// }))); |
387 | /// |
388 | /// for _ in c.iter(1000) { |
389 | /// // Only `ConnectionItem::Nothing` would be ever yielded here. |
390 | /// } |
391 | /// ``` |
392 | /// |
393 | /// Chain our callback to filter out some messages before `iter().next()`: |
394 | /// |
395 | /// ``` |
396 | /// use dbus::{ffidisp::Connection, MessageType}; |
397 | /// let c = Connection::new_session().unwrap(); |
398 | /// // Take the previously set callback |
399 | /// let mut old_cb = c.replace_message_callback(None).unwrap(); |
400 | /// // Set our callback |
401 | /// c.replace_message_callback(Some(Box::new(move |conn, msg| { |
402 | /// // Handle all signals on the spot |
403 | /// if msg.msg_type() == MessageType::Signal { |
404 | /// println!("Got signal: {:?}" , msg.get_items()); |
405 | /// // Stop all further processing of the message |
406 | /// return true; |
407 | /// } |
408 | /// // Delegate the rest of the messages to the previous callback |
409 | /// // in chain, e.g. to have them yielded by `iter().next()` |
410 | /// old_cb(conn, msg) |
411 | /// }))); |
412 | /// |
413 | /// # if false { |
414 | /// for _ in c.iter(1000) { |
415 | /// // `ConnectionItem::Signal` would never be yielded here. |
416 | /// } |
417 | /// # } |
418 | /// ``` |
419 | pub fn replace_message_callback(&self, f: Option<MessageCallback>) -> Option<MessageCallback> { |
420 | mem::replace(&mut *self.i.filter_cb.borrow_mut(), f) |
421 | } |
422 | |
423 | /// Sets a callback to be called if a file descriptor status changes. |
424 | /// |
425 | /// For async I/O. In rare cases, the number of fds to poll for read/write can change. |
426 | /// If this ever happens, you'll get a callback. The watch changed is provided as a parameter. |
427 | /// |
428 | /// In rare cases this might not even happen in the thread calling anything on the connection, |
429 | /// so the callback needs to be `Send`. |
430 | /// A mutex is held during the callback. If you try to call set_watch_callback from a callback, |
431 | /// you will deadlock. |
432 | /// |
433 | /// (Previously, this was instead put in a ConnectionItem queue, but this was not working correctly. |
434 | /// see https://github.com/diwic/dbus-rs/issues/99 for additional info.) |
435 | pub fn set_watch_callback(&self, f: Box<dyn Fn(Watch) + Send>) { self.i.watches.as_ref().unwrap().set_on_update(f); } |
436 | |
437 | fn check_panic(&self) { |
438 | let p = mem::replace(&mut *self.i.filter_cb_panic.borrow_mut(), Ok(())); |
439 | if let Err(perr) = p { panic::resume_unwind(perr); } |
440 | } |
441 | |
442 | fn next_msg(&self) -> Option<Message> { |
443 | while let Some(msg) = self.i.pending_items.borrow_mut().pop_front() { |
444 | let mut v: MsgHandlerList = mem::replace(&mut *self.i.handlers.borrow_mut(), vec!()); |
445 | let b = msghandler_process(&mut v, &msg, self); |
446 | let mut v2 = self.i.handlers.borrow_mut(); |
447 | v.append(&mut *v2); |
448 | *v2 = v; |
449 | if !b { return Some(msg) }; |
450 | }; |
451 | None |
452 | } |
453 | |
454 | } |
455 | |
456 | impl Drop for Connection { |
457 | fn drop(&mut self) { |
458 | unsafe { |
459 | ffi::dbus_connection_close(self.conn()); |
460 | ffi::dbus_connection_unref(self.conn()); |
461 | } |
462 | } |
463 | } |
464 | |
465 | impl fmt::Debug for Connection { |
466 | fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { |
467 | write!(f, "D-Bus Connection( {})" , self.unique_name()) |
468 | } |
469 | } |
470 | |
471 | impl crate::channel::Sender for Connection { |
472 | fn send(&self, msg: Message) -> Result<u32, ()> { Connection::send(self, msg) } |
473 | } |
474 | |
475 | impl crate::blocking::BlockingSender for Connection { |
476 | fn send_with_reply_and_block(&self, msg: Message, timeout: Duration) -> Result<Message, Error> { |
477 | Connection::send_with_reply_and_block(self, msg, timeout_ms:timeout.as_millis() as i32) |
478 | } |
479 | } |
480 | |
481 | |
482 | fn msghandler_process(v: &mut MsgHandlerList, m: &Message, c: &Connection) -> bool { |
483 | let mut ii: isize = -1; |
484 | loop { |
485 | ii += 1; |
486 | let i: usize = ii as usize; |
487 | if i >= v.len() { return false }; |
488 | |
489 | if !v[i].handler_type().matches_msg(m) { continue; } |
490 | if let Some(r: MsgHandlerResult) = v[i].handle_msg(m) { |
491 | for msg: Message in r.reply.into_iter() { c.send(msg).unwrap(); } |
492 | if r.done { v.remove(index:i); ii -= 1; } |
493 | if r.handled { return true; } |
494 | } |
495 | } |
496 | } |
497 | |
498 | /// ConnectionItem iterator |
499 | pub struct ConnectionItems<'a> { |
500 | c: &'a Connection, |
501 | timeout_ms: Option<i32>, |
502 | end_on_timeout: bool, |
503 | handlers: MsgHandlerList, |
504 | } |
505 | |
506 | impl<'a> ConnectionItems<'a> { |
507 | /// Builder method that adds a new msg handler. |
508 | /// |
509 | /// Note: Likely to changed/refactored/removed in next release |
510 | pub fn with<H: 'static + MsgHandler>(mut self, h: H) -> Self { |
511 | self.handlers.push(Box::new(h)); self |
512 | } |
513 | |
514 | // Returns true if processed, false if not |
515 | fn process_handlers(&mut self, ci: &ConnectionItem) -> bool { |
516 | let m = match *ci { |
517 | ConnectionItem::MethodReturn(ref msg) => msg, |
518 | ConnectionItem::Signal(ref msg) => msg, |
519 | ConnectionItem::MethodCall(ref msg) => msg, |
520 | ConnectionItem::Nothing => return false, |
521 | }; |
522 | |
523 | msghandler_process(&mut self.handlers, m, &self.c) |
524 | } |
525 | |
526 | /// Access and modify message handlers |
527 | /// |
528 | /// Note: Likely to changed/refactored/removed in next release |
529 | pub fn msg_handlers(&mut self) -> &mut Vec<Box<dyn MsgHandler>> { &mut self.handlers } |
530 | |
531 | /// Creates a new ConnectionItems iterator |
532 | /// |
533 | /// For io_timeout, setting None means the fds will not be read/written. I e, only pending |
534 | /// items in libdbus's internal queue will be processed. |
535 | /// |
536 | /// For end_on_timeout, setting false will means that the iterator will never finish (unless |
537 | /// the D-Bus server goes down). Instead, ConnectionItem::Nothing will be returned in case no |
538 | /// items are in queue. |
539 | pub fn new(conn: &'a Connection, io_timeout: Option<i32>, end_on_timeout: bool) -> Self { |
540 | ConnectionItems { |
541 | c: conn, |
542 | timeout_ms: io_timeout, |
543 | end_on_timeout: end_on_timeout, |
544 | handlers: Vec::new(), |
545 | } |
546 | } |
547 | } |
548 | |
549 | impl<'a> Iterator for ConnectionItems<'a> { |
550 | type Item = ConnectionItem; |
551 | fn next(&mut self) -> Option<ConnectionItem> { |
552 | loop { |
553 | if self.c.i.filter_cb.borrow().is_none() { panic!("ConnectionItems::next called recursively or with a MessageCallback set to None" ); } |
554 | let i: Option<ConnectionItem> = self.c.next_msg().map(|x| x.into()); |
555 | if let Some(ci) = i { |
556 | if !self.process_handlers(&ci) { return Some(ci); } |
557 | } |
558 | |
559 | if let Some(t) = self.timeout_ms { |
560 | let r = unsafe { ffi::dbus_connection_read_write_dispatch(self.c.conn(), t as c_int) }; |
561 | self.c.check_panic(); |
562 | if !self.c.i.pending_items.borrow().is_empty() { continue }; |
563 | if r == 0 { return None; } |
564 | } |
565 | |
566 | let r = unsafe { ffi::dbus_connection_dispatch(self.c.conn()) }; |
567 | self.c.check_panic(); |
568 | |
569 | if !self.c.i.pending_items.borrow().is_empty() { continue }; |
570 | if r == ffi::DBusDispatchStatus::DataRemains { continue }; |
571 | if r == ffi::DBusDispatchStatus::Complete { return if self.end_on_timeout { None } else { Some(ConnectionItem::Nothing) } }; |
572 | panic!("dbus_connection_dispatch failed" ); |
573 | } |
574 | } |
575 | } |
576 | |
577 | /// Iterator over incoming messages on a connection. |
578 | #[derive (Debug, Clone)] |
579 | pub struct ConnMsgs<C> { |
580 | /// The connection or some reference to it. |
581 | pub conn: C, |
582 | /// How many ms dbus should block, waiting for incoming messages until timing out. |
583 | /// |
584 | /// If set to None, the dbus library will not read/write from file descriptors at all. |
585 | /// Instead the iterator will end when there's nothing currently in the queue. |
586 | pub timeout_ms: Option<u32>, |
587 | } |
588 | |
589 | impl<C: ops::Deref<Target = Connection>> Iterator for ConnMsgs<C> { |
590 | type Item = Message; |
591 | fn next(&mut self) -> Option<Self::Item> { |
592 | |
593 | loop { |
594 | let iconn = &self.conn.i; |
595 | if iconn.filter_cb.borrow().is_none() { panic!("ConnMsgs::next called recursively or with a MessageCallback set to None" ); } |
596 | let i = self.conn.next_msg(); |
597 | if let Some(ci) = i { return Some(ci); } |
598 | |
599 | if let Some(t) = self.timeout_ms { |
600 | let r = unsafe { ffi::dbus_connection_read_write_dispatch(self.conn.conn(), t as c_int) }; |
601 | self.conn.check_panic(); |
602 | if !iconn.pending_items.borrow().is_empty() { continue }; |
603 | if r == 0 { return None; } |
604 | } |
605 | |
606 | let r = unsafe { ffi::dbus_connection_dispatch(self.conn.conn()) }; |
607 | self.conn.check_panic(); |
608 | |
609 | if !iconn.pending_items.borrow().is_empty() { continue }; |
610 | if r == ffi::DBusDispatchStatus::DataRemains { continue }; |
611 | if r == ffi::DBusDispatchStatus::Complete { return None } |
612 | panic!("dbus_connection_dispatch failed" ); |
613 | } |
614 | } |
615 | } |
616 | |
617 | #[test ] |
618 | fn message_reply() { |
619 | use std::{cell, rc}; |
620 | let c = Connection::get_private(BusType::Session).unwrap(); |
621 | assert!(c.is_connected()); |
622 | let m = Message::new_method_call("org.freedesktop.DBus" , "/" , "org.freedesktop.DBus" , "ListNames" ).unwrap(); |
623 | let quit = rc::Rc::new(cell::Cell::new(false)); |
624 | let quit2 = quit.clone(); |
625 | let reply = c.send_with_reply(m, move |result| { |
626 | let r = result.unwrap(); |
627 | let _: crate::arg::Array<&str, _> = r.get1().unwrap(); |
628 | quit2.set(true); |
629 | }).unwrap(); |
630 | for _ in c.iter(1000).with(reply) { if quit.get() { return; } } |
631 | assert!(false); |
632 | } |
633 | |