1// SPDX-License-Identifier: MIT
2
3//! Utilities for using an [`EventQueue`] from wayland-client with an event loop
4//! that performs polling with [`calloop`](https://crates.io/crates/calloop).
5//!
6//! # Example
7//!
8//! ```no_run,rust
9//! use calloop::EventLoop;
10//! use calloop_wayland_source::WaylandSource;
11//! use wayland_client::{Connection, QueueHandle};
12//!
13//! // Create a Wayland connection and a queue.
14//! let connection = Connection::connect_to_env().unwrap();
15//! let event_queue = connection.new_event_queue();
16//! let queue_handle = event_queue.handle();
17//!
18//! // Create the calloop event loop to drive everytihng.
19//! let mut event_loop: EventLoop<()> = EventLoop::try_new().unwrap();
20//! let loop_handle = event_loop.handle();
21//!
22//! // Insert the wayland source into the calloop's event loop.
23//! WaylandSource::new(connection, event_queue).insert(loop_handle).unwrap();
24//!
25//! // This will start dispatching the event loop and processing pending wayland requests.
26//! while let Ok(_) = event_loop.dispatch(None, &mut ()) {
27//! // Your logic here.
28//! }
29//! ```
30
31#![deny(unsafe_op_in_unsafe_fn)]
32use std::io;
33
34use calloop::generic::Generic;
35use calloop::{
36 EventSource, InsertError, Interest, LoopHandle, Mode, Poll, PostAction, Readiness,
37 RegistrationToken, Token, TokenFactory,
38};
39use rustix::io::Errno;
40use wayland_backend::client::{ReadEventsGuard, WaylandError};
41use wayland_client::{Connection, DispatchError, EventQueue};
42
43#[cfg(feature = "log")]
44use log::error as log_error;
45#[cfg(not(feature = "log"))]
46use std::eprintln as log_error;
47
48/// An adapter to insert an [`EventQueue`] into a calloop
49/// [`EventLoop`](calloop::EventLoop).
50///
51/// This type implements [`EventSource`] which generates an event whenever
52/// events on the event queue need to be dispatched. The event queue available
53/// in the callback calloop registers may be used to dispatch pending
54/// events using [`EventQueue::dispatch_pending`].
55///
56/// [`WaylandSource::insert`] can be used to insert this source into an event
57/// loop and automatically dispatch pending events on the event queue.
58#[derive(Debug)]
59pub struct WaylandSource<D> {
60 // In theory, we could use the same event queue inside `connection_source`
61 // However, calloop's safety requirements mean that we cannot then give
62 // mutable access to the queue, which is incompatible with our current interface
63 // Additionally, `Connection` is cheaply cloneable, so it's not a huge burden
64 queue: EventQueue<D>,
65 connection_source: Generic<Connection>,
66 read_guard: Option<ReadEventsGuard>,
67 /// Calloop's before_will_sleep method allows
68 /// skipping the sleeping by returning a `Token`.
69 /// We cannot produce this on the fly, so store it here instead
70 fake_token: Option<Token>,
71 // Some calloop event handlers don't support error handling, so we have to store the error
72 // for a short time until we reach a method which allows it
73 stored_error: Result<(), io::Error>,
74}
75
76impl<D> WaylandSource<D> {
77 /// Wrap an [`EventQueue`] as a [`WaylandSource`].
78 ///
79 /// `queue` must be from the connection `Connection`.
80 /// This is not a safety invariant, but not following this may cause
81 /// freezes or hangs
82 pub fn new(connection: Connection, queue: EventQueue<D>) -> WaylandSource<D> {
83 let connection_source = Generic::new(connection, Interest::READ, Mode::Level);
84
85 WaylandSource {
86 queue,
87 connection_source,
88 read_guard: None,
89 fake_token: None,
90 stored_error: Ok(()),
91 }
92 }
93
94 /// Access the underlying event queue
95 ///
96 /// Note that you should not replace this queue with a queue from a
97 /// different `Connection`, as that may cause freezes or other hangs.
98 pub fn queue(&mut self) -> &mut EventQueue<D> {
99 &mut self.queue
100 }
101
102 /// Access the connection to the Wayland server
103 pub fn connection(&self) -> &Connection {
104 self.connection_source.get_ref()
105 }
106
107 /// Insert this source into the given event loop.
108 ///
109 /// This adapter will pass the event loop's shared data as the `D` type for
110 /// the event loop.
111 pub fn insert(self, handle: LoopHandle<D>) -> Result<RegistrationToken, InsertError<Self>>
112 where
113 D: 'static,
114 {
115 handle.insert_source(self, |_, queue, data| queue.dispatch_pending(data))
116 }
117}
118
119impl<D> EventSource for WaylandSource<D> {
120 type Error = calloop::Error;
121 type Event = ();
122 /// The underlying event queue.
123 ///
124 /// You should call [`EventQueue::dispatch_pending`] inside your callback
125 /// using this queue.
126 type Metadata = EventQueue<D>;
127 type Ret = Result<usize, DispatchError>;
128
129 const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true;
130
131 fn process_events<F>(
132 &mut self,
133 _: Readiness,
134 _: Token,
135 mut callback: F,
136 ) -> Result<PostAction, Self::Error>
137 where
138 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
139 {
140 debug_assert!(self.read_guard.is_none());
141
142 // Take the stored error
143 std::mem::replace(&mut self.stored_error, Ok(()))?;
144
145 // We know that the event will either be a fake event
146 // produced in `before_will_sleep`, or a "real" event from the underlying
147 // source (self.queue_events). Our behaviour in both cases is the same.
148 // In theory we might want to call the process_events handler on the underlying
149 // event source. However, we know that Generic's `process_events` call is a
150 // no-op, so we just handle the event ourselves.
151
152 let queue = &mut self.queue;
153 // Dispatch any pending events in the queue
154 Self::loop_callback_pending(queue, &mut callback)?;
155
156 // Once dispatching is finished, flush the responses to the compositor
157 flush_queue(queue)?;
158
159 Ok(PostAction::Continue)
160 }
161
162 fn register(
163 &mut self,
164 poll: &mut Poll,
165 token_factory: &mut TokenFactory,
166 ) -> calloop::Result<()> {
167 self.fake_token = Some(token_factory.token());
168 self.connection_source.register(poll, token_factory)
169 }
170
171 fn reregister(
172 &mut self,
173 poll: &mut Poll,
174 token_factory: &mut TokenFactory,
175 ) -> calloop::Result<()> {
176 self.connection_source.reregister(poll, token_factory)
177 }
178
179 fn unregister(&mut self, poll: &mut Poll) -> calloop::Result<()> {
180 self.connection_source.unregister(poll)
181 }
182
183 fn before_sleep(&mut self) -> calloop::Result<Option<(Readiness, Token)>> {
184 debug_assert!(self.read_guard.is_none());
185
186 flush_queue(&mut self.queue)?;
187
188 self.read_guard = self.queue.prepare_read();
189 match self.read_guard {
190 Some(_) => Ok(None),
191 // If getting the guard failed, that means that there are
192 // events in the queue, and so we need to handle the events instantly
193 // rather than waiting on an event in polling. We tell calloop this
194 // by returning Some here. Note that the readiness value is
195 // never used, so we just need some marker
196 None => Ok(Some((Readiness::EMPTY, self.fake_token.unwrap()))),
197 }
198 }
199
200 fn before_handle_events(&mut self, events: calloop::EventIterator<'_>) {
201 // It's important that the guard isn't held whilst process_events calls occur
202 // This can use arbitrary user-provided code, which may want to use the wayland
203 // socket For example, creating a Vulkan surface needs access to the
204 // connection
205 let guard = self.read_guard.take();
206 if events.count() > 0 {
207 // Read events from the socket if any are available
208 if let Some(Err(WaylandError::Io(err))) = guard.map(ReadEventsGuard::read) {
209 // If some other thread read events before us, concurrently, that's an expected
210 // case, so this error isn't an issue. Other error kinds do need to be returned,
211 // however
212 if err.kind() != io::ErrorKind::WouldBlock {
213 // before_handle_events doesn't allow returning errors
214 // For now, cache it in self until process_events is called
215 self.stored_error = Err(err);
216 }
217 }
218 }
219 }
220}
221
222fn flush_queue<D>(queue: &mut EventQueue<D>) -> Result<(), calloop::Error> {
223 if let Err(WaylandError::Io(err: Error)) = queue.flush() {
224 // WouldBlock error means the compositor could not process all
225 // our messages quickly. Either it is slowed
226 // down or we are a spammer. Should not really
227 // happen, if it does we do nothing and will flush again later
228 if err.kind() != io::ErrorKind::WouldBlock {
229 // in case of error, forward it and fast-exit
230 log_error!("Error trying to flush the wayland display: {}", err);
231 return Err(err.into());
232 }
233 }
234 Ok(())
235}
236
237impl<D> WaylandSource<D> {
238 /// Loop over the callback until all pending messages have been dispatched.
239 fn loop_callback_pending<F>(queue: &mut EventQueue<D>, callback: &mut F) -> io::Result<()>
240 where
241 F: FnMut((), &mut EventQueue<D>) -> Result<usize, DispatchError>,
242 {
243 // Loop on the callback until no pending events are left.
244 loop {
245 match callback((), queue) {
246 // No more pending events.
247 Ok(0) => break Ok(()),
248 Ok(_) => continue,
249 Err(DispatchError::Backend(WaylandError::Io(err))) => {
250 return Err(err);
251 },
252 Err(DispatchError::Backend(WaylandError::Protocol(err))) => {
253 log_error!("Protocol error received on display: {}", err);
254
255 break Err(Errno::PROTO.into());
256 },
257 Err(DispatchError::BadMessage { interface, sender_id, opcode }) => {
258 log_error!(
259 "Bad message on interface \"{}\": (sender_id: {}, opcode: {})",
260 interface,
261 sender_id,
262 opcode,
263 );
264
265 break Err(Errno::PROTO.into());
266 },
267 }
268 }
269 }
270}
271