1 | use crate::{channel::WatchFd, ffi}; |
2 | use libc; |
3 | use crate::ffidisp::Connection; |
4 | |
5 | use std::mem; |
6 | use std::sync::{Mutex, RwLock}; |
7 | #[cfg (unix)] |
8 | use std::os::unix::io::{RawFd, AsRawFd}; |
9 | #[cfg (windows)] |
10 | use std::os::windows::io::{RawSocket, AsRawSocket}; |
11 | #[cfg (unix)] |
12 | use libc::{POLLIN, POLLOUT, POLLERR, POLLHUP}; |
13 | #[cfg (windows)] |
14 | use winapi::um::winsock2::{POLLIN, POLLOUT, POLLERR, POLLHUP}; |
15 | use std::os::raw::{c_void, c_uint}; |
16 | |
17 | /// A file descriptor to watch for incoming events (for async I/O). |
18 | /// |
19 | /// # Example |
20 | /// ``` |
21 | /// extern crate libc; |
22 | /// extern crate dbus; |
23 | /// fn main() { |
24 | /// use dbus::ffidisp::{Connection, BusType, WatchEvent}; |
25 | /// let c = Connection::get_private(BusType::Session).unwrap(); |
26 | /// |
27 | /// // Get a list of fds to poll for |
28 | /// let mut fds: Vec<_> = c.watch_fds().iter().map(|w| w.to_pollfd()).collect(); |
29 | /// |
30 | /// // Poll them with a 1 s timeout |
31 | /// let r = unsafe { libc::poll(fds.as_mut_ptr(), fds.len() as libc::c_ulong, 1000) }; |
32 | /// assert!(r >= 0); |
33 | /// |
34 | /// // And handle incoming events |
35 | /// for pfd in fds.iter().filter(|pfd| pfd.revents != 0) { |
36 | /// for item in c.watch_handle(pfd.fd, WatchEvent::from_revents(pfd.revents)) { |
37 | /// // Handle item |
38 | /// println!("Received ConnectionItem: {:?}" , item); |
39 | /// } |
40 | /// } |
41 | /// } |
42 | /// ``` |
43 | |
44 | #[repr (C)] |
45 | #[derive (Debug, PartialEq, Copy, Clone)] |
46 | /// The enum is here for backwards compatibility mostly. |
47 | /// |
48 | /// It should really be bitflags instead. |
49 | pub enum WatchEvent { |
50 | /// The fd is readable |
51 | Readable = ffi::DBUS_WATCH_READABLE as isize, |
52 | /// The fd is writable |
53 | Writable = ffi::DBUS_WATCH_WRITABLE as isize, |
54 | /// An error occured on the fd |
55 | Error = ffi::DBUS_WATCH_ERROR as isize, |
56 | /// The fd received a hangup. |
57 | Hangup = ffi::DBUS_WATCH_HANGUP as isize, |
58 | } |
59 | |
60 | impl WatchEvent { |
61 | /// After running poll, this transforms the revents into a parameter you can send into `Connection::watch_handle` |
62 | pub fn from_revents(revents: libc::c_short) -> c_uint { |
63 | 0 + |
64 | if (revents & POLLIN) != 0 { WatchEvent::Readable as c_uint } else { 0 } + |
65 | if (revents & POLLOUT) != 0 { WatchEvent::Writable as c_uint } else { 0 } + |
66 | if (revents & POLLERR) != 0 { WatchEvent::Error as c_uint } else { 0 } + |
67 | if (revents & POLLHUP) != 0 { WatchEvent::Hangup as c_uint } else { 0 } |
68 | } |
69 | } |
70 | |
71 | #[derive (Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] |
72 | /// A file descriptor, and an indication whether it should be read from, written to, or both. |
73 | pub struct Watch { |
74 | fd: WatchFd, |
75 | read: bool, |
76 | write: bool, |
77 | } |
78 | |
79 | impl Watch { |
80 | /// Get the RawFd this Watch is for |
81 | pub fn fd(&self) -> WatchFd { self.fd } |
82 | /// Add POLLIN to events to listen for |
83 | pub fn readable(&self) -> bool { self.read } |
84 | /// Add POLLOUT to events to listen for |
85 | pub fn writable(&self) -> bool { self.write } |
86 | /// Returns the current watch as a libc::pollfd, to use with libc::poll |
87 | #[cfg (unix)] |
88 | pub fn to_pollfd(&self) -> libc::pollfd { |
89 | libc::pollfd { fd: self.fd, revents: 0, events: POLLERR + POLLHUP + |
90 | if self.readable() { POLLIN } else { 0 } + |
91 | if self.writable() { POLLOUT } else { 0 }, |
92 | } |
93 | } |
94 | /// Returns the current watch as a winapi::um::winsock2::WSAPOLLFD, to use with winapi::um::winsock2::WSAPoll |
95 | #[cfg (windows)] |
96 | pub fn to_pollfd(&self) -> winapi::um::winsock2::WSAPOLLFD { |
97 | winapi::um::winsock2::WSAPOLLFD { |
98 | fd: self.fd as winapi::um::winsock2::SOCKET, |
99 | revents: 0, events: 0 + |
100 | if self.readable() { POLLIN } else { 0 } + |
101 | if self.writable() { POLLOUT } else { 0 }, |
102 | } |
103 | } |
104 | } |
105 | |
106 | #[cfg (unix)] |
107 | impl AsRawFd for Watch { |
108 | fn as_raw_fd(&self) -> RawFd { self.fd } |
109 | } |
110 | |
111 | #[cfg (windows)] |
112 | impl AsRawSocket for Watch { |
113 | fn as_raw_socket(&self) -> RawSocket { self.fd } |
114 | } |
115 | |
116 | /// Note - internal struct, not to be used outside API. Moving it outside its box will break things. |
117 | pub struct WatchList { |
118 | watches: RwLock<Vec<*mut ffi::DBusWatch>>, |
119 | enabled_fds: Mutex<Vec<Watch>>, |
120 | on_update: Mutex<Box<dyn Fn(Watch) + Send>>, |
121 | } |
122 | |
123 | impl WatchList { |
124 | pub fn new(c: &Connection, on_update: Box<dyn Fn(Watch) + Send>) -> Box<WatchList> { |
125 | let w = Box::new(WatchList { on_update: Mutex::new(on_update), watches: RwLock::new(vec!()), enabled_fds: Mutex::new(vec!()) }); |
126 | if unsafe { ffi::dbus_connection_set_watch_functions(crate::ffidisp::connection::conn_handle(c), |
127 | Some(add_watch_cb), Some(remove_watch_cb), Some(toggled_watch_cb), &*w as *const _ as *mut _, None) } == 0 { |
128 | panic!("dbus_connection_set_watch_functions failed" ); |
129 | } |
130 | w |
131 | } |
132 | |
133 | pub fn set_on_update(&self, on_update: Box<dyn Fn(Watch) + Send>) { *self.on_update.lock().unwrap() = on_update; } |
134 | |
135 | pub fn watch_handle(&self, fd: WatchFd, flags: c_uint) { |
136 | // println!("watch_handle {} flags {}", fd, flags); |
137 | for &q in self.watches.read().unwrap().iter() { |
138 | let w = self.get_watch(q); |
139 | if w.fd != fd { continue }; |
140 | if unsafe { ffi::dbus_watch_handle(q, flags) } == 0 { |
141 | panic!("dbus_watch_handle failed" ); |
142 | } |
143 | self.update(q); |
144 | }; |
145 | } |
146 | |
147 | pub fn get_enabled_fds(&self) -> Vec<Watch> { |
148 | self.enabled_fds.lock().unwrap().clone() |
149 | } |
150 | |
151 | fn get_watch(&self, watch: *mut ffi::DBusWatch) -> Watch { |
152 | #[cfg (unix)] |
153 | let mut w = Watch { fd: unsafe { ffi::dbus_watch_get_unix_fd(watch) }, read: false, write: false}; |
154 | #[cfg (windows)] |
155 | let mut w = Watch { fd: unsafe { ffi::dbus_watch_get_socket(watch) as RawSocket }, read: false, write: false}; |
156 | let enabled = self.watches.read().unwrap().contains(&watch) && unsafe { ffi::dbus_watch_get_enabled(watch) != 0 }; |
157 | let flags = unsafe { ffi::dbus_watch_get_flags(watch) }; |
158 | if enabled { |
159 | w.read = (flags & WatchEvent::Readable as c_uint) != 0; |
160 | w.write = (flags & WatchEvent::Writable as c_uint) != 0; |
161 | } |
162 | // println!("Get watch fd {:?} ptr {:?} enabled {:?} flags {:?}", w, watch, enabled, flags); |
163 | w |
164 | } |
165 | |
166 | fn update(&self, watch: *mut ffi::DBusWatch) { |
167 | let mut w = self.get_watch(watch); |
168 | |
169 | for &q in self.watches.read().unwrap().iter() { |
170 | if q == watch { continue }; |
171 | let ww = self.get_watch(q); |
172 | if ww.fd != w.fd { continue }; |
173 | w.read |= ww.read; |
174 | w.write |= ww.write; |
175 | } |
176 | // println!("Updated sum: {:?}", w); |
177 | |
178 | { |
179 | let mut fdarr = self.enabled_fds.lock().unwrap(); |
180 | |
181 | if w.write || w.read { |
182 | if fdarr.contains(&w) { return; } // Nothing changed |
183 | } |
184 | else if !fdarr.iter().any(|q| w.fd == q.fd) { return; } // Nothing changed |
185 | |
186 | fdarr.retain(|f| f.fd != w.fd); |
187 | if w.write || w.read { fdarr.push(w) }; |
188 | } |
189 | let func = self.on_update.lock().unwrap(); |
190 | (*func)(w); |
191 | } |
192 | } |
193 | |
194 | extern "C" fn add_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) -> u32 { |
195 | let wlist: &WatchList = unsafe { mem::transmute(src:data) }; |
196 | // println!("Add watch {:?}", watch); |
197 | wlist.watches.write().unwrap().push(watch); |
198 | wlist.update(watch); |
199 | 1 |
200 | } |
201 | |
202 | extern "C" fn remove_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) { |
203 | let wlist: &WatchList = unsafe { mem::transmute(src:data) }; |
204 | // println!("Removed watch {:?}", watch); |
205 | wlist.watches.write().unwrap().retain(|w: &*mut c_void| *w != watch); |
206 | wlist.update(watch); |
207 | } |
208 | |
209 | extern "C" fn toggled_watch_cb(watch: *mut ffi::DBusWatch, data: *mut c_void) { |
210 | let wlist: &WatchList = unsafe { mem::transmute(src:data) }; |
211 | // println!("Toggled watch {:?}", watch); |
212 | wlist.update(watch); |
213 | } |
214 | |
215 | #[cfg (test)] |
216 | mod test { |
217 | #[cfg (unix)] |
218 | use libc; |
219 | use super::super::{Connection, Message, BusType, WatchEvent, ConnectionItem, MessageType}; |
220 | use super::{POLLIN, POLLOUT}; |
221 | |
222 | #[test ] |
223 | fn test_async() { |
224 | let c = Connection::get_private(BusType::Session).unwrap(); |
225 | c.register_object_path("/test" ).unwrap(); |
226 | let m = Message::new_method_call(&c.unique_name(), "/test" , "com.example.asynctest" , "AsyncTest" ).unwrap(); |
227 | let serial = c.send(m).unwrap(); |
228 | println!("Async: sent serial {}" , serial); |
229 | |
230 | let mut fds: Vec<_> = c.watch_fds().iter().map(|w| w.to_pollfd()).collect(); |
231 | let mut new_fds = None; |
232 | let mut i = 0; |
233 | let mut success = false; |
234 | while !success { |
235 | i += 1; |
236 | if let Some(q) = new_fds { fds = q; new_fds = None }; |
237 | |
238 | for f in fds.iter_mut() { f.revents = 0 }; |
239 | |
240 | #[cfg (unix)] |
241 | assert!(unsafe { libc::poll(fds.as_mut_ptr(), fds.len() as libc::nfds_t, 1000) } > 0); |
242 | |
243 | #[cfg (windows)] |
244 | assert!(unsafe { winapi::um::winsock2::WSAPoll(fds.as_mut_ptr(), fds.len() as u32, 1000) } > 0); |
245 | |
246 | for f in fds.iter().filter(|pfd| pfd.revents != 0) { |
247 | let m = WatchEvent::from_revents(f.revents); |
248 | println!("Async: fd {}, revents {} -> {}" , f.fd, f.revents, m); |
249 | assert!(f.revents & POLLIN != 0 || f.revents & POLLOUT != 0); |
250 | |
251 | #[cfg (unix)] |
252 | let fd = f.fd; |
253 | #[cfg (windows)] |
254 | let fd = f.fd as std::os::windows::io::RawSocket; |
255 | |
256 | for e in c.watch_handle(fd, m) { |
257 | println!("Async: got {:?}" , e); |
258 | match e { |
259 | ConnectionItem::MethodCall(m) => { |
260 | assert_eq!(m.msg_type(), MessageType::MethodCall); |
261 | assert_eq!(&*m.path().unwrap(), "/test" ); |
262 | assert_eq!(&*m.interface().unwrap(), "com.example.asynctest" ); |
263 | assert_eq!(&*m.member().unwrap(), "AsyncTest" ); |
264 | let mut mr = Message::new_method_return(&m).unwrap(); |
265 | mr.append_items(&["Goodies" .into()]); |
266 | c.send(mr).unwrap(); |
267 | } |
268 | ConnectionItem::MethodReturn(m) => { |
269 | assert_eq!(m.msg_type(), MessageType::MethodReturn); |
270 | assert_eq!(m.get_reply_serial().unwrap(), serial); |
271 | let i = m.get_items(); |
272 | let s: &str = i[0].inner().unwrap(); |
273 | assert_eq!(s, "Goodies" ); |
274 | success = true; |
275 | } |
276 | _ => (), |
277 | } |
278 | } |
279 | if i > 100 { panic!() }; |
280 | } |
281 | } |
282 | } |
283 | } |
284 | |