| 1 | use crate::{channel::WatchFd, ffi}; |
| 2 | use libc; |
| 3 | use crate::ffidisp::Connection; |
| 4 | |
| 5 | use std::mem; |
| 6 | use std::sync::{Mutex, RwLock}; |
| 7 | #[cfg (unix)] |
| 8 | use std::os::unix::io::{RawFd, AsRawFd}; |
| 9 | #[cfg (windows)] |
| 10 | use std::os::windows::io::{RawSocket, AsRawSocket}; |
| 11 | #[cfg (unix)] |
| 12 | use libc::{POLLIN, POLLOUT, POLLERR, POLLHUP}; |
| 13 | #[cfg (windows)] |
| 14 | use winapi::um::winsock2::{POLLIN, POLLOUT, POLLERR, POLLHUP}; |
| 15 | use std::os::raw::{c_void, c_uint}; |
| 16 | |
| 17 | /// A file descriptor to watch for incoming events (for async I/O). |
| 18 | /// |
| 19 | /// # Example |
| 20 | /// ``` |
| 21 | /// extern crate libc; |
| 22 | /// extern crate dbus; |
| 23 | /// fn main() { |
| 24 | /// use dbus::ffidisp::{Connection, BusType, WatchEvent}; |
| 25 | /// let c = Connection::get_private(BusType::Session).unwrap(); |
| 26 | /// |
| 27 | /// // Get a list of fds to poll for |
| 28 | /// let mut fds: Vec<_> = c.watch_fds().iter().map(|w| w.to_pollfd()).collect(); |
| 29 | /// |
| 30 | /// // Poll them with a 1 s timeout |
| 31 | /// let r = unsafe { libc::poll(fds.as_mut_ptr(), fds.len() as libc::c_ulong, 1000) }; |
| 32 | /// assert!(r >= 0); |
| 33 | /// |
| 34 | /// // And handle incoming events |
| 35 | /// for pfd in fds.iter().filter(|pfd| pfd.revents != 0) { |
| 36 | /// for item in c.watch_handle(pfd.fd, WatchEvent::from_revents(pfd.revents)) { |
| 37 | /// // Handle item |
| 38 | /// println!("Received ConnectionItem: {:?}" , item); |
| 39 | /// } |
| 40 | /// } |
| 41 | /// } |
| 42 | /// ``` |
| 43 | |
| 44 | #[repr (C)] |
| 45 | #[derive (Debug, PartialEq, Copy, Clone)] |
| 46 | /// The enum is here for backwards compatibility mostly. |
| 47 | /// |
| 48 | /// It should really be bitflags instead. |
| 49 | pub enum WatchEvent { |
| 50 | /// The fd is readable |
| 51 | Readable = ffi::DBUS_WATCH_READABLE as isize, |
| 52 | /// The fd is writable |
| 53 | Writable = ffi::DBUS_WATCH_WRITABLE as isize, |
| 54 | /// An error occured on the fd |
| 55 | Error = ffi::DBUS_WATCH_ERROR as isize, |
| 56 | /// The fd received a hangup. |
| 57 | Hangup = ffi::DBUS_WATCH_HANGUP as isize, |
| 58 | } |
| 59 | |
| 60 | impl WatchEvent { |
| 61 | /// After running poll, this transforms the revents into a parameter you can send into `Connection::watch_handle` |
| 62 | pub fn from_revents(revents: libc::c_short) -> c_uint { |
| 63 | 0 + |
| 64 | if (revents & POLLIN) != 0 { WatchEvent::Readable as c_uint } else { 0 } + |
| 65 | if (revents & POLLOUT) != 0 { WatchEvent::Writable as c_uint } else { 0 } + |
| 66 | if (revents & POLLERR) != 0 { WatchEvent::Error as c_uint } else { 0 } + |
| 67 | if (revents & POLLHUP) != 0 { WatchEvent::Hangup as c_uint } else { 0 } |
| 68 | } |
| 69 | } |
| 70 | |
| 71 | #[derive (Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] |
| 72 | /// A file descriptor, and an indication whether it should be read from, written to, or both. |
| 73 | pub struct Watch { |
| 74 | fd: WatchFd, |
| 75 | read: bool, |
| 76 | write: bool, |
| 77 | } |
| 78 | |
| 79 | impl Watch { |
| 80 | /// Get the RawFd this Watch is for |
| 81 | pub fn fd(&self) -> WatchFd { self.fd } |
| 82 | /// Add POLLIN to events to listen for |
| 83 | pub fn readable(&self) -> bool { self.read } |
| 84 | /// Add POLLOUT to events to listen for |
| 85 | pub fn writable(&self) -> bool { self.write } |
| 86 | /// Returns the current watch as a libc::pollfd, to use with libc::poll |
| 87 | #[cfg (unix)] |
| 88 | pub fn to_pollfd(&self) -> libc::pollfd { |
| 89 | libc::pollfd { fd: self.fd, revents: 0, events: POLLERR + POLLHUP + |
| 90 | if self.readable() { POLLIN } else { 0 } + |
| 91 | if self.writable() { POLLOUT } else { 0 }, |
| 92 | } |
| 93 | } |
| 94 | /// Returns the current watch as a winapi::um::winsock2::WSAPOLLFD, to use with winapi::um::winsock2::WSAPoll |
| 95 | #[cfg (windows)] |
| 96 | pub fn to_pollfd(&self) -> winapi::um::winsock2::WSAPOLLFD { |
| 97 | winapi::um::winsock2::WSAPOLLFD { |
| 98 | fd: self.fd as winapi::um::winsock2::SOCKET, |
| 99 | revents: 0, events: 0 + |
| 100 | if self.readable() { POLLIN } else { 0 } + |
| 101 | if self.writable() { POLLOUT } else { 0 }, |
| 102 | } |
| 103 | } |
| 104 | } |
| 105 | |
| 106 | #[cfg (unix)] |
| 107 | impl AsRawFd for Watch { |
| 108 | fn as_raw_fd(&self) -> RawFd { self.fd } |
| 109 | } |
| 110 | |
| 111 | #[cfg (windows)] |
| 112 | impl AsRawSocket for Watch { |
| 113 | fn as_raw_socket(&self) -> RawSocket { self.fd } |
| 114 | } |
| 115 | |
| 116 | /// Note - internal struct, not to be used outside API. Moving it outside its box will break things. |
| 117 | pub struct WatchList { |
| 118 | watches: RwLock<Vec<*mut ffi::DBusWatch>>, |
| 119 | enabled_fds: Mutex<Vec<Watch>>, |
| 120 | on_update: Mutex<Box<dyn Fn(Watch) + Send>>, |
| 121 | } |
| 122 | |
| 123 | impl WatchList { |
| 124 | pub fn new(c: &Connection, on_update: Box<dyn Fn(Watch) + Send>) -> Box<WatchList> { |
| 125 | let w = Box::new(WatchList { on_update: Mutex::new(on_update), watches: RwLock::new(vec!()), enabled_fds: Mutex::new(vec!()) }); |
| 126 | if unsafe { ffi::dbus_connection_set_watch_functions(crate::ffidisp::connection::conn_handle(c), |
| 127 | Some(add_watch_cb), Some(remove_watch_cb), Some(toggled_watch_cb), &*w as *const _ as *mut _, None) } == 0 { |
| 128 | panic!("dbus_connection_set_watch_functions failed" ); |
| 129 | } |
| 130 | w |
| 131 | } |
| 132 | |
| 133 | pub fn set_on_update(&self, on_update: Box<dyn Fn(Watch) + Send>) { *self.on_update.lock().unwrap() = on_update; } |
| 134 | |
| 135 | pub fn watch_handle(&self, fd: WatchFd, flags: c_uint) { |
| 136 | // println!("watch_handle {} flags {}", fd, flags); |
| 137 | for &q in self.watches.read().unwrap().iter() { |
| 138 | let w = self.get_watch(q); |
| 139 | if w.fd != fd { continue }; |
| 140 | if unsafe { ffi::dbus_watch_handle(q, flags) } == 0 { |
| 141 | panic!("dbus_watch_handle failed" ); |
| 142 | } |
| 143 | self.update(q); |
| 144 | }; |
| 145 | } |
| 146 | |
| 147 | pub fn get_enabled_fds(&self) -> Vec<Watch> { |
| 148 | self.enabled_fds.lock().unwrap().clone() |
| 149 | } |
| 150 | |
| 151 | fn get_watch(&self, watch: *mut ffi::DBusWatch) -> Watch { |
| 152 | #[cfg (unix)] |
| 153 | let mut w = Watch { fd: unsafe { ffi::dbus_watch_get_unix_fd(watch) }, read: false, write: false}; |
| 154 | #[cfg (windows)] |
| 155 | let mut w = Watch { fd: unsafe { ffi::dbus_watch_get_socket(watch) as RawSocket }, read: false, write: false}; |
| 156 | let enabled = self.watches.read().unwrap().contains(&watch) && unsafe { ffi::dbus_watch_get_enabled(watch) != 0 }; |
| 157 | let flags = unsafe { ffi::dbus_watch_get_flags(watch) }; |
| 158 | if enabled { |
| 159 | w.read = (flags & WatchEvent::Readable as c_uint) != 0; |
| 160 | w.write = (flags & WatchEvent::Writable as c_uint) != 0; |
| 161 | } |
| 162 | // println!("Get watch fd {:?} ptr {:?} enabled {:?} flags {:?}", w, watch, enabled, flags); |
| 163 | w |
| 164 | } |
| 165 | |
| 166 | fn update(&self, watch: *mut ffi::DBusWatch) { |
| 167 | let mut w = self.get_watch(watch); |
| 168 | |
| 169 | for &q in self.watches.read().unwrap().iter() { |
| 170 | if q == watch { continue }; |
| 171 | let ww = self.get_watch(q); |
| 172 | if ww.fd != w.fd { continue }; |
| 173 | w.read |= ww.read; |
| 174 | w.write |= ww.write; |
| 175 | } |
| 176 | // println!("Updated sum: {:?}", w); |
| 177 | |
| 178 | { |
| 179 | let mut fdarr = self.enabled_fds.lock().unwrap(); |
| 180 | |
| 181 | if w.write || w.read { |
| 182 | if fdarr.contains(&w) { return; } // Nothing changed |
| 183 | } |
| 184 | else if !fdarr.iter().any(|q| w.fd == q.fd) { return; } // Nothing changed |
| 185 | |
| 186 | fdarr.retain(|f| f.fd != w.fd); |
| 187 | if w.write || w.read { fdarr.push(w) }; |
| 188 | } |
| 189 | let func = self.on_update.lock().unwrap(); |
| 190 | (*func)(w); |
| 191 | } |
| 192 | } |
| 193 | |
| 194 | extern "C" fn add_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) -> u32 { |
| 195 | let wlist: &WatchList = unsafe { mem::transmute(src:data) }; |
| 196 | // println!("Add watch {:?}", watch); |
| 197 | wlist.watches.write().unwrap().push(watch); |
| 198 | wlist.update(watch); |
| 199 | 1 |
| 200 | } |
| 201 | |
| 202 | extern "C" fn remove_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) { |
| 203 | let wlist: &WatchList = unsafe { mem::transmute(src:data) }; |
| 204 | // println!("Removed watch {:?}", watch); |
| 205 | wlist.watches.write().unwrap().retain(|w: &*mut c_void| *w != watch); |
| 206 | wlist.update(watch); |
| 207 | } |
| 208 | |
| 209 | extern "C" fn toggled_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) { |
| 210 | let wlist: &WatchList = unsafe { mem::transmute(src:data) }; |
| 211 | // println!("Toggled watch {:?}", watch); |
| 212 | wlist.update(watch); |
| 213 | } |
| 214 | |
| 215 | #[cfg (test)] |
| 216 | mod test { |
| 217 | #[cfg (unix)] |
| 218 | use libc; |
| 219 | use super::super::{Connection, Message, BusType, WatchEvent, ConnectionItem, MessageType}; |
| 220 | use super::{POLLIN, POLLOUT}; |
| 221 | |
| 222 | #[test ] |
| 223 | fn test_async() { |
| 224 | let c = Connection::get_private(BusType::Session).unwrap(); |
| 225 | c.register_object_path("/test" ).unwrap(); |
| 226 | let m = Message::new_method_call(&c.unique_name(), "/test" , "com.example.asynctest" , "AsyncTest" ).unwrap(); |
| 227 | let serial = c.send(m).unwrap(); |
| 228 | println!("Async: sent serial {}" , serial); |
| 229 | |
| 230 | let mut fds: Vec<_> = c.watch_fds().iter().map(|w| w.to_pollfd()).collect(); |
| 231 | let mut new_fds = None; |
| 232 | let mut i = 0; |
| 233 | let mut success = false; |
| 234 | while !success { |
| 235 | i += 1; |
| 236 | if let Some(q) = new_fds { fds = q; new_fds = None }; |
| 237 | |
| 238 | for f in fds.iter_mut() { f.revents = 0 }; |
| 239 | |
| 240 | #[cfg (unix)] |
| 241 | assert!(unsafe { libc::poll(fds.as_mut_ptr(), fds.len() as libc::nfds_t, 1000) } > 0); |
| 242 | |
| 243 | #[cfg (windows)] |
| 244 | assert!(unsafe { winapi::um::winsock2::WSAPoll(fds.as_mut_ptr(), fds.len() as u32, 1000) } > 0); |
| 245 | |
| 246 | for f in fds.iter().filter(|pfd| pfd.revents != 0) { |
| 247 | let m = WatchEvent::from_revents(f.revents); |
| 248 | println!("Async: fd {}, revents {} -> {}" , f.fd, f.revents, m); |
| 249 | assert!(f.revents & POLLIN != 0 || f.revents & POLLOUT != 0); |
| 250 | |
| 251 | #[cfg (unix)] |
| 252 | let fd = f.fd; |
| 253 | #[cfg (windows)] |
| 254 | let fd = f.fd as std::os::windows::io::RawSocket; |
| 255 | |
| 256 | for e in c.watch_handle(fd, m) { |
| 257 | println!("Async: got {:?}" , e); |
| 258 | match e { |
| 259 | ConnectionItem::MethodCall(m) => { |
| 260 | assert_eq!(m.msg_type(), MessageType::MethodCall); |
| 261 | assert_eq!(&*m.path().unwrap(), "/test" ); |
| 262 | assert_eq!(&*m.interface().unwrap(), "com.example.asynctest" ); |
| 263 | assert_eq!(&*m.member().unwrap(), "AsyncTest" ); |
| 264 | let mut mr = Message::new_method_return(&m).unwrap(); |
| 265 | mr.append_items(&["Goodies" .into()]); |
| 266 | c.send(mr).unwrap(); |
| 267 | } |
| 268 | ConnectionItem::MethodReturn(m) => { |
| 269 | assert_eq!(m.msg_type(), MessageType::MethodReturn); |
| 270 | assert_eq!(m.get_reply_serial().unwrap(), serial); |
| 271 | let i = m.get_items(); |
| 272 | let s: &str = i[0].inner().unwrap(); |
| 273 | assert_eq!(s, "Goodies" ); |
| 274 | success = true; |
| 275 | } |
| 276 | _ => (), |
| 277 | } |
| 278 | } |
| 279 | if i > 100 { panic!() }; |
| 280 | } |
| 281 | } |
| 282 | } |
| 283 | } |
| 284 | |