1use crate::{channel::WatchFd, ffi};
2use libc;
3use crate::ffidisp::Connection;
4
5use std::mem;
6use std::sync::{Mutex, RwLock};
7#[cfg(unix)]
8use std::os::unix::io::{RawFd, AsRawFd};
9#[cfg(windows)]
10use std::os::windows::io::{RawSocket, AsRawSocket};
11#[cfg(unix)]
12use libc::{POLLIN, POLLOUT, POLLERR, POLLHUP};
13#[cfg(windows)]
14use winapi::um::winsock2::{POLLIN, POLLOUT, POLLERR, POLLHUP};
15use std::os::raw::{c_void, c_uint};
16
17/// A file descriptor to watch for incoming events (for async I/O).
18///
19/// # Example
20/// ```
21/// extern crate libc;
22/// extern crate dbus;
23/// fn main() {
24/// use dbus::ffidisp::{Connection, BusType, WatchEvent};
25/// let c = Connection::get_private(BusType::Session).unwrap();
26///
27/// // Get a list of fds to poll for
28/// let mut fds: Vec<_> = c.watch_fds().iter().map(|w| w.to_pollfd()).collect();
29///
30/// // Poll them with a 1 s timeout
31/// let r = unsafe { libc::poll(fds.as_mut_ptr(), fds.len() as libc::c_ulong, 1000) };
32/// assert!(r >= 0);
33///
34/// // And handle incoming events
35/// for pfd in fds.iter().filter(|pfd| pfd.revents != 0) {
36/// for item in c.watch_handle(pfd.fd, WatchEvent::from_revents(pfd.revents)) {
37/// // Handle item
38/// println!("Received ConnectionItem: {:?}", item);
39/// }
40/// }
41/// }
42/// ```
43
44#[repr(C)]
45#[derive(Debug, PartialEq, Copy, Clone)]
46/// The enum is here for backwards compatibility mostly.
47///
48/// It should really be bitflags instead.
49pub enum WatchEvent {
50 /// The fd is readable
51 Readable = ffi::DBUS_WATCH_READABLE as isize,
52 /// The fd is writable
53 Writable = ffi::DBUS_WATCH_WRITABLE as isize,
54 /// An error occured on the fd
55 Error = ffi::DBUS_WATCH_ERROR as isize,
56 /// The fd received a hangup.
57 Hangup = ffi::DBUS_WATCH_HANGUP as isize,
58}
59
60impl WatchEvent {
61 /// After running poll, this transforms the revents into a parameter you can send into `Connection::watch_handle`
62 pub fn from_revents(revents: libc::c_short) -> c_uint {
63 0 +
64 if (revents & POLLIN) != 0 { WatchEvent::Readable as c_uint } else { 0 } +
65 if (revents & POLLOUT) != 0 { WatchEvent::Writable as c_uint } else { 0 } +
66 if (revents & POLLERR) != 0 { WatchEvent::Error as c_uint } else { 0 } +
67 if (revents & POLLHUP) != 0 { WatchEvent::Hangup as c_uint } else { 0 }
68 }
69}
70
71#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
72/// A file descriptor, and an indication whether it should be read from, written to, or both.
73pub struct Watch {
74 fd: WatchFd,
75 read: bool,
76 write: bool,
77}
78
79impl Watch {
80 /// Get the RawFd this Watch is for
81 pub fn fd(&self) -> WatchFd { self.fd }
82 /// Add POLLIN to events to listen for
83 pub fn readable(&self) -> bool { self.read }
84 /// Add POLLOUT to events to listen for
85 pub fn writable(&self) -> bool { self.write }
86 /// Returns the current watch as a libc::pollfd, to use with libc::poll
87 #[cfg(unix)]
88 pub fn to_pollfd(&self) -> libc::pollfd {
89 libc::pollfd { fd: self.fd, revents: 0, events: POLLERR + POLLHUP +
90 if self.readable() { POLLIN } else { 0 } +
91 if self.writable() { POLLOUT } else { 0 },
92 }
93 }
94 /// Returns the current watch as a winapi::um::winsock2::WSAPOLLFD, to use with winapi::um::winsock2::WSAPoll
95 #[cfg(windows)]
96 pub fn to_pollfd(&self) -> winapi::um::winsock2::WSAPOLLFD {
97 winapi::um::winsock2::WSAPOLLFD {
98 fd: self.fd as winapi::um::winsock2::SOCKET,
99 revents: 0, events: 0 +
100 if self.readable() { POLLIN } else { 0 } +
101 if self.writable() { POLLOUT } else { 0 },
102 }
103 }
104}
105
106#[cfg(unix)]
107impl AsRawFd for Watch {
108 fn as_raw_fd(&self) -> RawFd { self.fd }
109}
110
111#[cfg(windows)]
112impl AsRawSocket for Watch {
113 fn as_raw_socket(&self) -> RawSocket { self.fd }
114}
115
116/// Note - internal struct, not to be used outside API. Moving it outside its box will break things.
117pub struct WatchList {
118 watches: RwLock<Vec<*mut ffi::DBusWatch>>,
119 enabled_fds: Mutex<Vec<Watch>>,
120 on_update: Mutex<Box<dyn Fn(Watch) + Send>>,
121}
122
123impl WatchList {
124 pub fn new(c: &Connection, on_update: Box<dyn Fn(Watch) + Send>) -> Box<WatchList> {
125 let w = Box::new(WatchList { on_update: Mutex::new(on_update), watches: RwLock::new(vec!()), enabled_fds: Mutex::new(vec!()) });
126 if unsafe { ffi::dbus_connection_set_watch_functions(crate::ffidisp::connection::conn_handle(c),
127 Some(add_watch_cb), Some(remove_watch_cb), Some(toggled_watch_cb), &*w as *const _ as *mut _, None) } == 0 {
128 panic!("dbus_connection_set_watch_functions failed");
129 }
130 w
131 }
132
133 pub fn set_on_update(&self, on_update: Box<dyn Fn(Watch) + Send>) { *self.on_update.lock().unwrap() = on_update; }
134
135 pub fn watch_handle(&self, fd: WatchFd, flags: c_uint) {
136 // println!("watch_handle {} flags {}", fd, flags);
137 for &q in self.watches.read().unwrap().iter() {
138 let w = self.get_watch(q);
139 if w.fd != fd { continue };
140 if unsafe { ffi::dbus_watch_handle(q, flags) } == 0 {
141 panic!("dbus_watch_handle failed");
142 }
143 self.update(q);
144 };
145 }
146
147 pub fn get_enabled_fds(&self) -> Vec<Watch> {
148 self.enabled_fds.lock().unwrap().clone()
149 }
150
151 fn get_watch(&self, watch: *mut ffi::DBusWatch) -> Watch {
152 #[cfg(unix)]
153 let mut w = Watch { fd: unsafe { ffi::dbus_watch_get_unix_fd(watch) }, read: false, write: false};
154 #[cfg(windows)]
155 let mut w = Watch { fd: unsafe { ffi::dbus_watch_get_socket(watch) as RawSocket }, read: false, write: false};
156 let enabled = self.watches.read().unwrap().contains(&watch) && unsafe { ffi::dbus_watch_get_enabled(watch) != 0 };
157 let flags = unsafe { ffi::dbus_watch_get_flags(watch) };
158 if enabled {
159 w.read = (flags & WatchEvent::Readable as c_uint) != 0;
160 w.write = (flags & WatchEvent::Writable as c_uint) != 0;
161 }
162 // println!("Get watch fd {:?} ptr {:?} enabled {:?} flags {:?}", w, watch, enabled, flags);
163 w
164 }
165
166 fn update(&self, watch: *mut ffi::DBusWatch) {
167 let mut w = self.get_watch(watch);
168
169 for &q in self.watches.read().unwrap().iter() {
170 if q == watch { continue };
171 let ww = self.get_watch(q);
172 if ww.fd != w.fd { continue };
173 w.read |= ww.read;
174 w.write |= ww.write;
175 }
176 // println!("Updated sum: {:?}", w);
177
178 {
179 let mut fdarr = self.enabled_fds.lock().unwrap();
180
181 if w.write || w.read {
182 if fdarr.contains(&w) { return; } // Nothing changed
183 }
184 else if !fdarr.iter().any(|q| w.fd == q.fd) { return; } // Nothing changed
185
186 fdarr.retain(|f| f.fd != w.fd);
187 if w.write || w.read { fdarr.push(w) };
188 }
189 let func = self.on_update.lock().unwrap();
190 (*func)(w);
191 }
192}
193
194extern "C" fn add_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) -> u32 {
195 let wlist: &WatchList = unsafe { mem::transmute(src:data) };
196 // println!("Add watch {:?}", watch);
197 wlist.watches.write().unwrap().push(watch);
198 wlist.update(watch);
199 1
200}
201
202extern "C" fn remove_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) {
203 let wlist: &WatchList = unsafe { mem::transmute(src:data) };
204 // println!("Removed watch {:?}", watch);
205 wlist.watches.write().unwrap().retain(|w: &*mut c_void| *w != watch);
206 wlist.update(watch);
207}
208
209extern "C" fn toggled_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) {
210 let wlist: &WatchList = unsafe { mem::transmute(src:data) };
211 // println!("Toggled watch {:?}", watch);
212 wlist.update(watch);
213}
214
215#[cfg(test)]
216mod test {
217 #[cfg(unix)]
218 use libc;
219 use super::super::{Connection, Message, BusType, WatchEvent, ConnectionItem, MessageType};
220 use super::{POLLIN, POLLOUT};
221
222 #[test]
223 fn test_async() {
224 let c = Connection::get_private(BusType::Session).unwrap();
225 c.register_object_path("/test").unwrap();
226 let m = Message::new_method_call(&c.unique_name(), "/test", "com.example.asynctest", "AsyncTest").unwrap();
227 let serial = c.send(m).unwrap();
228 println!("Async: sent serial {}", serial);
229
230 let mut fds: Vec<_> = c.watch_fds().iter().map(|w| w.to_pollfd()).collect();
231 let mut new_fds = None;
232 let mut i = 0;
233 let mut success = false;
234 while !success {
235 i += 1;
236 if let Some(q) = new_fds { fds = q; new_fds = None };
237
238 for f in fds.iter_mut() { f.revents = 0 };
239
240 #[cfg(unix)]
241 assert!(unsafe { libc::poll(fds.as_mut_ptr(), fds.len() as libc::nfds_t, 1000) } > 0);
242
243 #[cfg(windows)]
244 assert!(unsafe { winapi::um::winsock2::WSAPoll(fds.as_mut_ptr(), fds.len() as u32, 1000) } > 0);
245
246 for f in fds.iter().filter(|pfd| pfd.revents != 0) {
247 let m = WatchEvent::from_revents(f.revents);
248 println!("Async: fd {}, revents {} -> {}", f.fd, f.revents, m);
249 assert!(f.revents & POLLIN != 0 || f.revents & POLLOUT != 0);
250
251 #[cfg(unix)]
252 let fd = f.fd;
253 #[cfg(windows)]
254 let fd = f.fd as std::os::windows::io::RawSocket;
255
256 for e in c.watch_handle(fd, m) {
257 println!("Async: got {:?}", e);
258 match e {
259 ConnectionItem::MethodCall(m) => {
260 assert_eq!(m.msg_type(), MessageType::MethodCall);
261 assert_eq!(&*m.path().unwrap(), "/test");
262 assert_eq!(&*m.interface().unwrap(), "com.example.asynctest");
263 assert_eq!(&*m.member().unwrap(), "AsyncTest");
264 let mut mr = Message::new_method_return(&m).unwrap();
265 mr.append_items(&["Goodies".into()]);
266 c.send(mr).unwrap();
267 }
268 ConnectionItem::MethodReturn(m) => {
269 assert_eq!(m.msg_type(), MessageType::MethodReturn);
270 assert_eq!(m.get_reply_serial().unwrap(), serial);
271 let i = m.get_items();
272 let s: &str = i[0].inner().unwrap();
273 assert_eq!(s, "Goodies");
274 success = true;
275 }
276 _ => (),
277 }
278 }
279 if i > 100 { panic!() };
280 }
281 }
282 }
283}
284