1//! Contains structs and traits relevant to the connection itself, and dispatching incoming messages.
2
3use crate::{Error, Message, MessageType, c_str_to_slice, channel::WatchFd, ffi, to_c_str};
4use crate::ffidisp::ConnPath;
5use std::{fmt, mem, ptr, thread, panic, ops};
6use std::{collections::VecDeque, time::Duration};
7use std::cell::{Cell, RefCell};
8use std::os::raw::{c_void, c_char, c_int, c_uint};
9use crate::strings::{BusName, Path};
10use super::{Watch, WatchList, MessageCallback, ConnectionItem, MsgHandler, MsgHandlerList, MessageReply, BusType};
11
12
13/* Since we register callbacks with userdata pointers,
14 we need to make sure the connection pointer does not move around.
15 Hence this extra indirection. */
16struct IConnection {
17 conn: Cell<*mut ffi::DBusConnection>,
18 pending_items: RefCell<VecDeque<Message>>,
19 watches: Option<Box<WatchList>>,
20 handlers: RefCell<super::MsgHandlerList>,
21
22 filter_cb: RefCell<Option<MessageCallback>>,
23 filter_cb_panic: RefCell<thread::Result<()>>,
24}
25
26/// A D-Bus connection. Start here if you want to get on the D-Bus!
27pub struct Connection {
28 i: Box<IConnection>,
29}
30
31pub (crate) fn conn_handle(c: &Connection) -> *mut ffi::DBusConnection {
32 c.i.conn.get()
33}
34
35extern "C" fn filter_message_cb(conn: *mut ffi::DBusConnection, msg: *mut ffi::DBusMessage,
36 user_data: *mut c_void) -> ffi::DBusHandlerResult {
37
38 let i: &IConnection = unsafe { mem::transmute(user_data) };
39 let connref: panic::AssertUnwindSafe<&Connection> = unsafe { mem::transmute(&i) };
40 if i.conn.get() != conn || i.filter_cb_panic.try_borrow().is_err() {
41 // This should never happen, but let's be extra sure
42 // process::abort(); ??
43 return ffi::DBusHandlerResult::Handled;
44 }
45 if i.filter_cb_panic.borrow().is_err() {
46 // We're in panic mode. Let's quit this ASAP
47 return ffi::DBusHandlerResult::Handled;
48 }
49
50 let fcb = panic::AssertUnwindSafe(&i.filter_cb);
51 let r = panic::catch_unwind(|| {
52 let m = Message::from_ptr(msg, true);
53 let mut cb = fcb.borrow_mut().take().unwrap(); // Take the callback out while we call it.
54 let r = cb(connref.0, m);
55 let mut cb2 = fcb.borrow_mut(); // If the filter callback has not been replaced, put it back in.
56 if cb2.is_none() { *cb2 = Some(cb) };
57 r
58 });
59
60 match r {
61 Ok(false) => ffi::DBusHandlerResult::NotYetHandled,
62 Ok(true) => ffi::DBusHandlerResult::Handled,
63 Err(e) => {
64 *i.filter_cb_panic.borrow_mut() = Err(e);
65 ffi::DBusHandlerResult::Handled
66 }
67 }
68}
69
70fn default_filter_callback(c: &Connection, m: Message) -> bool {
71 let b: bool = m.msg_type() == MessageType::Signal;
72 c.i.pending_items.borrow_mut().push_back(m);
73 b
74}
75
76extern "C" fn object_path_message_cb(_conn: *mut ffi::DBusConnection, _msg: *mut ffi::DBusMessage,
77 _user_data: *mut c_void) -> ffi::DBusHandlerResult {
78 /* Already pushed in filter_message_cb, so we just set the handled flag here to disable the
79 "default" handler. */
80 ffi::DBusHandlerResult::Handled
81}
82
83impl Connection {
84 #[inline(always)]
85 fn conn(&self) -> *mut ffi::DBusConnection {
86 self.i.conn.get()
87 }
88
89 fn conn_from_ptr(conn: *mut ffi::DBusConnection) -> Result<Connection, Error> {
90 let mut c = Connection { i: Box::new(IConnection {
91 conn: Cell::new(conn),
92 pending_items: RefCell::new(VecDeque::new()),
93 watches: None,
94 handlers: RefCell::new(vec!()),
95 filter_cb: RefCell::new(Some(Box::new(default_filter_callback))),
96 filter_cb_panic: RefCell::new(Ok(())),
97 })};
98
99 /* No, we don't want our app to suddenly quit if dbus goes down */
100 unsafe { ffi::dbus_connection_set_exit_on_disconnect(conn, 0) };
101 assert!(unsafe {
102 ffi::dbus_connection_add_filter(c.conn(), Some(filter_message_cb), mem::transmute(&*c.i), None)
103 } != 0);
104
105 c.i.watches = Some(WatchList::new(&c, Box::new(|_| {})));
106 Ok(c)
107 }
108
109 /// Creates a new connection to the session bus.
110 ///
111 /// Just a shortcut for `get_private(BusType::Session)`.
112 pub fn new_session() -> Result<Connection, Error> { Self::get_private(BusType::Session) }
113
114 /// Creates a new connection to the system bus.
115 ///
116 /// Just a shortcut for `get_private(BusType::System)`.
117 pub fn new_system() -> Result<Connection, Error> { Self::get_private(BusType::System) }
118
119 /// Creates a new D-Bus connection.
120 pub fn get_private(bus: BusType) -> Result<Connection, Error> {
121 let mut e = Error::empty();
122 let conn = unsafe { ffi::dbus_bus_get_private(bus, e.get_mut()) };
123 if conn.is_null() {
124 return Err(e)
125 }
126 Self::conn_from_ptr(conn)
127 }
128
129 /// Creates a new D-Bus connection to a remote address.
130 ///
131 /// Note: for all common cases (System / Session bus) you probably want "get_private" instead.
132 pub fn open_private(address: &str) -> Result<Connection, Error> {
133 let mut e = Error::empty();
134 let conn = unsafe { ffi::dbus_connection_open_private(to_c_str(address).as_ptr(), e.get_mut()) };
135 if conn.is_null() {
136 return Err(e)
137 }
138 Self::conn_from_ptr(conn)
139 }
140
141 /// Registers a new D-Bus connection with the bus.
142 ///
143 /// Note: `get_private` does this automatically, useful with `open_private`
144 pub fn register(&self) -> Result<(), Error> {
145 let mut e = Error::empty();
146 if unsafe { ffi::dbus_bus_register(self.conn(), e.get_mut()) == 0 } {
147 Err(e)
148 } else {
149 Ok(())
150 }
151 }
152
153 /// Gets whether the connection is currently open.
154 pub fn is_connected(&self) -> bool {
155 unsafe { ffi::dbus_connection_get_is_connected(self.conn()) != 0 }
156 }
157
158 /// Sends a message over the D-Bus and waits for a reply.
159 /// This is usually used for method calls.
160 pub fn send_with_reply_and_block(&self, msg: Message, timeout_ms: i32) -> Result<Message, Error> {
161 let mut e = Error::empty();
162 let response = unsafe {
163 ffi::dbus_connection_send_with_reply_and_block(self.conn(), msg.ptr(),
164 timeout_ms as c_int, e.get_mut())
165 };
166 if response.is_null() {
167 return Err(e);
168 }
169 Ok(Message::from_ptr(response, false))
170 }
171
172 /// Sends a message over the D-Bus without waiting. Useful for sending signals and method call replies.
173 pub fn send(&self, msg: Message) -> Result<u32,()> {
174 let mut serial = 0u32;
175 let r = unsafe { ffi::dbus_connection_send(self.conn(), msg.ptr(), &mut serial) };
176 if r == 0 { return Err(()); }
177 unsafe { ffi::dbus_connection_flush(self.conn()) };
178 Ok(serial)
179 }
180
181 /// Sends a message over the D-Bus, returning a MessageReply.
182 ///
183 /// Call add_handler on the result to start waiting for reply. This should be done before next call to `incoming` or `iter`.
184 pub fn send_with_reply<'a, F: FnOnce(Result<&Message, Error>) + 'a>(&self, msg: Message, f: F) -> Result<MessageReply<F>, ()> {
185 let serial = self.send(msg)?;
186 Ok(MessageReply(Some(f), serial))
187 }
188
189 /// Adds a message handler to the connection.
190 ///
191 /// # Example
192 ///
193 /// ```
194 /// use std::{cell, rc};
195 /// use dbus::{ffidisp::Connection, Message};
196 ///
197 /// let c = Connection::new_session().unwrap();
198 /// let m = Message::new_method_call("org.freedesktop.DBus", "/", "org.freedesktop.DBus", "ListNames").unwrap();
199 ///
200 /// let done: rc::Rc<cell::Cell<bool>> = Default::default();
201 /// let done2 = done.clone();
202 /// c.add_handler(c.send_with_reply(m, move |reply| {
203 /// let v: Vec<&str> = reply.unwrap().read1().unwrap();
204 /// println!("The names on the D-Bus are: {:?}", v);
205 /// done2.set(true);
206 /// }).unwrap());
207 /// while !done.get() { c.incoming(100).next(); }
208 /// ```
209 pub fn add_handler<H: MsgHandler + 'static>(&self, h: H) {
210 let h = Box::new(h);
211 self.i.handlers.borrow_mut().push(h);
212 }
213
214 /// Removes a MsgHandler from the connection.
215 ///
216 /// If there are many MsgHandlers, it is not specified which one will be returned.
217 ///
218 /// There might be more methods added later on, which give better ways to deal
219 /// with the list of MsgHandler currently on the connection. If this would help you,
220 /// please [file an issue](https://github.com/diwic/dbus-rs/issues).
221 pub fn extract_handler(&self) -> Option<Box<dyn MsgHandler>> {
222 self.i.handlers.borrow_mut().pop()
223 }
224
225 /// Get the connection's unique name.
226 pub fn unique_name(&self) -> String {
227 let c = unsafe { ffi::dbus_bus_get_unique_name(self.conn()) };
228 c_str_to_slice(&c).unwrap_or("").to_string()
229 }
230
231 /// Check if there are new incoming events
232 ///
233 /// If there are no incoming events, ConnectionItems::Nothing will be returned.
234 /// See ConnectionItems::new if you want to customize this behaviour.
235 pub fn iter(&self, timeout_ms: i32) -> ConnectionItems {
236 ConnectionItems::new(self, Some(timeout_ms), false)
237 }
238
239 /// Check if there are new incoming events
240 ///
241 /// Supersedes "iter".
242 pub fn incoming(&self, timeout_ms: u32) -> ConnMsgs<&Self> {
243 ConnMsgs { conn: &self, timeout_ms: Some(timeout_ms) }
244 }
245
246 /// Register an object path.
247 pub fn register_object_path(&self, path: &str) -> Result<(), Error> {
248 let mut e = Error::empty();
249 let p = to_c_str(path);
250 let vtable = ffi::DBusObjectPathVTable {
251 unregister_function: None,
252 message_function: Some(object_path_message_cb),
253 dbus_internal_pad1: None,
254 dbus_internal_pad2: None,
255 dbus_internal_pad3: None,
256 dbus_internal_pad4: None,
257 };
258 let r = unsafe {
259 let user_data: *mut c_void = mem::transmute(&*self.i);
260 ffi::dbus_connection_try_register_object_path(self.conn(), p.as_ptr(), &vtable, user_data, e.get_mut())
261 };
262 if r == 0 { Err(e) } else { Ok(()) }
263 }
264
265 /// Unregister an object path.
266 pub fn unregister_object_path(&self, path: &str) {
267 let p = to_c_str(path);
268 let r = unsafe { ffi::dbus_connection_unregister_object_path(self.conn(), p.as_ptr()) };
269 if r == 0 { panic!("Out of memory"); }
270 }
271
272 /// List registered object paths.
273 pub fn list_registered_object_paths(&self, path: &str) -> Vec<String> {
274 let p = to_c_str(path);
275 let mut clist: *mut *mut c_char = ptr::null_mut();
276 let r = unsafe { ffi::dbus_connection_list_registered(self.conn(), p.as_ptr(), &mut clist) };
277 if r == 0 { panic!("Out of memory"); }
278 let mut v = Vec::new();
279 let mut i = 0;
280 loop {
281 let s = unsafe {
282 let citer = clist.offset(i);
283 if *citer == ptr::null_mut() { break };
284 mem::transmute(citer)
285 };
286 v.push(format!("{}", c_str_to_slice(s).unwrap()));
287 i += 1;
288 }
289 unsafe { ffi::dbus_free_string_array(clist) };
290 v
291 }
292
293 /// Register a name.
294 pub fn register_name(&self, name: &str, flags: u32) -> Result<super::RequestNameReply, Error> {
295 let mut e = Error::empty();
296 let n = to_c_str(name);
297 let r = unsafe { ffi::dbus_bus_request_name(self.conn(), n.as_ptr(), flags, e.get_mut()) };
298 if r == -1 { Err(e) } else { Ok(unsafe { mem::transmute(r) }) }
299 }
300
301 /// Release a name.
302 pub fn release_name(&self, name: &str) -> Result<super::ReleaseNameReply, Error> {
303 let mut e = Error::empty();
304 let n = to_c_str(name);
305 let r = unsafe { ffi::dbus_bus_release_name(self.conn(), n.as_ptr(), e.get_mut()) };
306 if r == -1 { Err(e) } else { Ok(unsafe { mem::transmute(r) }) }
307 }
308
309 /// Add a match rule to match messages on the message bus.
310 ///
311 /// See the `unity_focused_window` example for how to use this to catch signals.
312 /// (The syntax of the "rule" string is specified in the [D-Bus specification](https://dbus.freedesktop.org/doc/dbus-specification.html#message-bus-routing-match-rules).)
313 pub fn add_match(&self, rule: &str) -> Result<(), Error> {
314 let mut e = Error::empty();
315 let n = to_c_str(rule);
316 unsafe { ffi::dbus_bus_add_match(self.conn(), n.as_ptr(), e.get_mut()) };
317 if e.name().is_some() { Err(e) } else { Ok(()) }
318 }
319
320 /// Remove a match rule to match messages on the message bus.
321 pub fn remove_match(&self, rule: &str) -> Result<(), Error> {
322 let mut e = Error::empty();
323 let n = to_c_str(rule);
324 unsafe { ffi::dbus_bus_remove_match(self.conn(), n.as_ptr(), e.get_mut()) };
325 if e.name().is_some() { Err(e) } else { Ok(()) }
326 }
327
328 /// Async I/O: Get an up-to-date list of file descriptors to watch.
329 ///
330 /// See the `Watch` struct for an example.
331 pub fn watch_fds(&self) -> Vec<Watch> {
332 self.i.watches.as_ref().unwrap().get_enabled_fds()
333 }
334
335 /// Async I/O: Call this function whenever you detected an event on the Fd,
336 /// Flags are a set of WatchEvent bits.
337 /// The returned iterator will return pending items only, never block for new events.
338 ///
339 /// See the `Watch` struct for an example.
340 pub fn watch_handle(&self, fd: WatchFd, flags: c_uint) -> ConnectionItems {
341 self.i.watches.as_ref().unwrap().watch_handle(fd, flags);
342 ConnectionItems::new(self, None, true)
343 }
344
345 /// Create a convenience struct for easier calling of many methods on the same destination and path.
346 pub fn with_path<'a, D: Into<BusName<'a>>, P: Into<Path<'a>>>(&'a self, dest: D, path: P, timeout_ms: i32) ->
347 ConnPath<'a, &'a Connection> {
348 ConnPath { conn: self, dest: dest.into(), path: path.into(), timeout: timeout_ms }
349 }
350
351 /// Replace the default message callback. Returns the previously set callback.
352 ///
353 /// By default, when you call ConnectionItems::next, all relevant incoming messages
354 /// are returned through the ConnectionItems iterator, and
355 /// irrelevant messages are passed on to libdbus's default handler.
356 /// If you need to customize this behaviour (i e, to handle all incoming messages yourself),
357 /// you can set this message callback yourself. A few caveats apply:
358 ///
359 /// Return true from the callback to disable libdbus's internal handling of the message, or
360 /// false to allow it. In other words, true and false correspond to
361 /// `DBUS_HANDLER_RESULT_HANDLED` and `DBUS_HANDLER_RESULT_NOT_YET_HANDLED` respectively.
362 ///
363 /// Be sure to call the previously set callback from inside your callback,
364 /// if you want, e.g. ConnectionItems::next to yield the message.
365 ///
366 /// You can unset the message callback (might be useful to satisfy the borrow checker), but
367 /// you will get a panic if you call ConnectionItems::next while the message callback is unset.
368 /// The message callback will be temporary unset while inside a message callback, so calling
369 /// ConnectionItems::next recursively will also result in a panic.
370 ///
371 /// If your message callback panics, ConnectionItems::next will panic, too.
372 ///
373 /// # Examples
374 ///
375 /// Replace the default callback with our own:
376 ///
377 /// ```ignore
378 /// use dbus::ffidisp::Connection;
379 /// let c = Connection::new_session().unwrap();
380 /// // Set our callback
381 /// c.replace_message_callback(Some(Box::new(move |conn, msg| {
382 /// println!("Got message: {:?}", msg.get_items());
383 /// // Let libdbus handle some things by default,
384 /// // like "nonexistent object" error replies to method calls
385 /// false
386 /// })));
387 ///
388 /// for _ in c.iter(1000) {
389 /// // Only `ConnectionItem::Nothing` would be ever yielded here.
390 /// }
391 /// ```
392 ///
393 /// Chain our callback to filter out some messages before `iter().next()`:
394 ///
395 /// ```
396 /// use dbus::{ffidisp::Connection, MessageType};
397 /// let c = Connection::new_session().unwrap();
398 /// // Take the previously set callback
399 /// let mut old_cb = c.replace_message_callback(None).unwrap();
400 /// // Set our callback
401 /// c.replace_message_callback(Some(Box::new(move |conn, msg| {
402 /// // Handle all signals on the spot
403 /// if msg.msg_type() == MessageType::Signal {
404 /// println!("Got signal: {:?}", msg.get_items());
405 /// // Stop all further processing of the message
406 /// return true;
407 /// }
408 /// // Delegate the rest of the messages to the previous callback
409 /// // in chain, e.g. to have them yielded by `iter().next()`
410 /// old_cb(conn, msg)
411 /// })));
412 ///
413 /// # if false {
414 /// for _ in c.iter(1000) {
415 /// // `ConnectionItem::Signal` would never be yielded here.
416 /// }
417 /// # }
418 /// ```
419 pub fn replace_message_callback(&self, f: Option<MessageCallback>) -> Option<MessageCallback> {
420 mem::replace(&mut *self.i.filter_cb.borrow_mut(), f)
421 }
422
423 /// Sets a callback to be called if a file descriptor status changes.
424 ///
425 /// For async I/O. In rare cases, the number of fds to poll for read/write can change.
426 /// If this ever happens, you'll get a callback. The watch changed is provided as a parameter.
427 ///
428 /// In rare cases this might not even happen in the thread calling anything on the connection,
429 /// so the callback needs to be `Send`.
430 /// A mutex is held during the callback. If you try to call set_watch_callback from a callback,
431 /// you will deadlock.
432 ///
433 /// (Previously, this was instead put in a ConnectionItem queue, but this was not working correctly.
434 /// see https://github.com/diwic/dbus-rs/issues/99 for additional info.)
435 pub fn set_watch_callback(&self, f: Box<dyn Fn(Watch) + Send>) { self.i.watches.as_ref().unwrap().set_on_update(f); }
436
437 fn check_panic(&self) {
438 let p = mem::replace(&mut *self.i.filter_cb_panic.borrow_mut(), Ok(()));
439 if let Err(perr) = p { panic::resume_unwind(perr); }
440 }
441
442 fn next_msg(&self) -> Option<Message> {
443 while let Some(msg) = self.i.pending_items.borrow_mut().pop_front() {
444 let mut v: MsgHandlerList = mem::replace(&mut *self.i.handlers.borrow_mut(), vec!());
445 let b = msghandler_process(&mut v, &msg, self);
446 let mut v2 = self.i.handlers.borrow_mut();
447 v.append(&mut *v2);
448 *v2 = v;
449 if !b { return Some(msg) };
450 };
451 None
452 }
453
454}
455
456impl Drop for Connection {
457 fn drop(&mut self) {
458 unsafe {
459 ffi::dbus_connection_close(self.conn());
460 ffi::dbus_connection_unref(self.conn());
461 }
462 }
463}
464
465impl fmt::Debug for Connection {
466 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
467 write!(f, "D-Bus Connection({})", self.unique_name())
468 }
469}
470
471impl crate::channel::Sender for Connection {
472 fn send(&self, msg: Message) -> Result<u32, ()> { Connection::send(self, msg) }
473}
474
475impl crate::blocking::BlockingSender for Connection {
476 fn send_with_reply_and_block(&self, msg: Message, timeout: Duration) -> Result<Message, Error> {
477 Connection::send_with_reply_and_block(self, msg, timeout_ms:timeout.as_millis() as i32)
478 }
479}
480
481
482fn msghandler_process(v: &mut MsgHandlerList, m: &Message, c: &Connection) -> bool {
483 let mut ii: isize = -1;
484 loop {
485 ii += 1;
486 let i: usize = ii as usize;
487 if i >= v.len() { return false };
488
489 if !v[i].handler_type().matches_msg(m) { continue; }
490 if let Some(r: MsgHandlerResult) = v[i].handle_msg(m) {
491 for msg: Message in r.reply.into_iter() { c.send(msg).unwrap(); }
492 if r.done { v.remove(index:i); ii -= 1; }
493 if r.handled { return true; }
494 }
495 }
496}
497
498/// ConnectionItem iterator
499pub struct ConnectionItems<'a> {
500 c: &'a Connection,
501 timeout_ms: Option<i32>,
502 end_on_timeout: bool,
503 handlers: MsgHandlerList,
504}
505
506impl<'a> ConnectionItems<'a> {
507 /// Builder method that adds a new msg handler.
508 ///
509 /// Note: Likely to changed/refactored/removed in next release
510 pub fn with<H: 'static + MsgHandler>(mut self, h: H) -> Self {
511 self.handlers.push(Box::new(h)); self
512 }
513
514 // Returns true if processed, false if not
515 fn process_handlers(&mut self, ci: &ConnectionItem) -> bool {
516 let m = match *ci {
517 ConnectionItem::MethodReturn(ref msg) => msg,
518 ConnectionItem::Signal(ref msg) => msg,
519 ConnectionItem::MethodCall(ref msg) => msg,
520 ConnectionItem::Nothing => return false,
521 };
522
523 msghandler_process(&mut self.handlers, m, &self.c)
524 }
525
526 /// Access and modify message handlers
527 ///
528 /// Note: Likely to changed/refactored/removed in next release
529 pub fn msg_handlers(&mut self) -> &mut Vec<Box<dyn MsgHandler>> { &mut self.handlers }
530
531 /// Creates a new ConnectionItems iterator
532 ///
533 /// For io_timeout, setting None means the fds will not be read/written. I e, only pending
534 /// items in libdbus's internal queue will be processed.
535 ///
536 /// For end_on_timeout, setting false will means that the iterator will never finish (unless
537 /// the D-Bus server goes down). Instead, ConnectionItem::Nothing will be returned in case no
538 /// items are in queue.
539 pub fn new(conn: &'a Connection, io_timeout: Option<i32>, end_on_timeout: bool) -> Self {
540 ConnectionItems {
541 c: conn,
542 timeout_ms: io_timeout,
543 end_on_timeout: end_on_timeout,
544 handlers: Vec::new(),
545 }
546 }
547}
548
549impl<'a> Iterator for ConnectionItems<'a> {
550 type Item = ConnectionItem;
551 fn next(&mut self) -> Option<ConnectionItem> {
552 loop {
553 if self.c.i.filter_cb.borrow().is_none() { panic!("ConnectionItems::next called recursively or with a MessageCallback set to None"); }
554 let i: Option<ConnectionItem> = self.c.next_msg().map(|x| x.into());
555 if let Some(ci) = i {
556 if !self.process_handlers(&ci) { return Some(ci); }
557 }
558
559 if let Some(t) = self.timeout_ms {
560 let r = unsafe { ffi::dbus_connection_read_write_dispatch(self.c.conn(), t as c_int) };
561 self.c.check_panic();
562 if !self.c.i.pending_items.borrow().is_empty() { continue };
563 if r == 0 { return None; }
564 }
565
566 let r = unsafe { ffi::dbus_connection_dispatch(self.c.conn()) };
567 self.c.check_panic();
568
569 if !self.c.i.pending_items.borrow().is_empty() { continue };
570 if r == ffi::DBusDispatchStatus::DataRemains { continue };
571 if r == ffi::DBusDispatchStatus::Complete { return if self.end_on_timeout { None } else { Some(ConnectionItem::Nothing) } };
572 panic!("dbus_connection_dispatch failed");
573 }
574 }
575}
576
577/// Iterator over incoming messages on a connection.
578#[derive(Debug, Clone)]
579pub struct ConnMsgs<C> {
580 /// The connection or some reference to it.
581 pub conn: C,
582 /// How many ms dbus should block, waiting for incoming messages until timing out.
583 ///
584 /// If set to None, the dbus library will not read/write from file descriptors at all.
585 /// Instead the iterator will end when there's nothing currently in the queue.
586 pub timeout_ms: Option<u32>,
587}
588
589impl<C: ops::Deref<Target = Connection>> Iterator for ConnMsgs<C> {
590 type Item = Message;
591 fn next(&mut self) -> Option<Self::Item> {
592
593 loop {
594 let iconn = &self.conn.i;
595 if iconn.filter_cb.borrow().is_none() { panic!("ConnMsgs::next called recursively or with a MessageCallback set to None"); }
596 let i = self.conn.next_msg();
597 if let Some(ci) = i { return Some(ci); }
598
599 if let Some(t) = self.timeout_ms {
600 let r = unsafe { ffi::dbus_connection_read_write_dispatch(self.conn.conn(), t as c_int) };
601 self.conn.check_panic();
602 if !iconn.pending_items.borrow().is_empty() { continue };
603 if r == 0 { return None; }
604 }
605
606 let r = unsafe { ffi::dbus_connection_dispatch(self.conn.conn()) };
607 self.conn.check_panic();
608
609 if !iconn.pending_items.borrow().is_empty() { continue };
610 if r == ffi::DBusDispatchStatus::DataRemains { continue };
611 if r == ffi::DBusDispatchStatus::Complete { return None }
612 panic!("dbus_connection_dispatch failed");
613 }
614 }
615}
616
617#[test]
618fn message_reply() {
619 use std::{cell, rc};
620 let c = Connection::get_private(BusType::Session).unwrap();
621 assert!(c.is_connected());
622 let m = Message::new_method_call("org.freedesktop.DBus", "/", "org.freedesktop.DBus", "ListNames").unwrap();
623 let quit = rc::Rc::new(cell::Cell::new(false));
624 let quit2 = quit.clone();
625 let reply = c.send_with_reply(m, move |result| {
626 let r = result.unwrap();
627 let _: crate::arg::Array<&str, _> = r.get1().unwrap();
628 quit2.set(true);
629 }).unwrap();
630 for _ in c.iter(1000).with(reply) { if quit.get() { return; } }
631 assert!(false);
632}
633