1use std::{collections::vec_deque::VecDeque, io, time::Duration};
2
3#[cfg(unix)]
4use crate::event::source::unix::UnixInternalEventSource;
5#[cfg(windows)]
6use crate::event::source::windows::WindowsEventSource;
7#[cfg(feature = "event-stream")]
8use crate::event::sys::Waker;
9use crate::event::{filter::Filter, source::EventSource, timeout::PollTimeout, InternalEvent};
10
11/// Can be used to read `InternalEvent`s.
12pub(crate) struct InternalEventReader {
13 events: VecDeque<InternalEvent>,
14 source: Option<Box<dyn EventSource>>,
15 skipped_events: Vec<InternalEvent>,
16}
17
18impl Default for InternalEventReader {
19 fn default() -> Self {
20 #[cfg(windows)]
21 let source = WindowsEventSource::new();
22 #[cfg(unix)]
23 let source: Result = UnixInternalEventSource::new();
24
25 let source: Option> = source.ok().map(|x: UnixInternalEventSource| Box::new(x) as Box<dyn EventSource>);
26
27 InternalEventReader {
28 source,
29 events: VecDeque::with_capacity(32),
30 skipped_events: Vec::with_capacity(32),
31 }
32 }
33}
34
35impl InternalEventReader {
36 /// Returns a `Waker` allowing to wake/force the `poll` method to return `Ok(false)`.
37 #[cfg(feature = "event-stream")]
38 pub(crate) fn waker(&self) -> Waker {
39 self.source.as_ref().expect("reader source not set").waker()
40 }
41
42 pub(crate) fn poll<F>(&mut self, timeout: Option<Duration>, filter: &F) -> io::Result<bool>
43 where
44 F: Filter,
45 {
46 for event in &self.events {
47 if filter.eval(event) {
48 return Ok(true);
49 }
50 }
51
52 let event_source = match self.source.as_mut() {
53 Some(source) => source,
54 None => {
55 return Err(std::io::Error::new(
56 std::io::ErrorKind::Other,
57 "Failed to initialize input reader",
58 ))
59 }
60 };
61
62 let poll_timeout = PollTimeout::new(timeout);
63
64 loop {
65 let maybe_event = match event_source.try_read(poll_timeout.leftover()) {
66 Ok(None) => None,
67 Ok(Some(event)) => {
68 if filter.eval(&event) {
69 Some(event)
70 } else {
71 self.skipped_events.push(event);
72 None
73 }
74 }
75 Err(e) => {
76 if e.kind() == io::ErrorKind::Interrupted {
77 return Ok(false);
78 }
79
80 return Err(e);
81 }
82 };
83
84 if poll_timeout.elapsed() || maybe_event.is_some() {
85 self.events.extend(self.skipped_events.drain(..));
86
87 if let Some(event) = maybe_event {
88 self.events.push_front(event);
89 return Ok(true);
90 }
91
92 return Ok(false);
93 }
94 }
95 }
96
97 pub(crate) fn read<F>(&mut self, filter: &F) -> io::Result<InternalEvent>
98 where
99 F: Filter,
100 {
101 let mut skipped_events = VecDeque::new();
102
103 loop {
104 while let Some(event) = self.events.pop_front() {
105 if filter.eval(&event) {
106 while let Some(event) = skipped_events.pop_front() {
107 self.events.push_back(event);
108 }
109
110 return Ok(event);
111 } else {
112 // We can not directly write events back to `self.events`.
113 // If we did, we would put our self's into an endless loop
114 // that would enqueue -> dequeue -> enqueue etc.
115 // This happens because `poll` in this function will always return true if there are events in it's.
116 // And because we just put the non-fulfilling event there this is going to be the case.
117 // Instead we can store them into the temporary buffer,
118 // and then when the filter is fulfilled write all events back in order.
119 skipped_events.push_back(event);
120 }
121 }
122
123 let _ = self.poll(None, filter)?;
124 }
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use std::io;
131 use std::{collections::VecDeque, time::Duration};
132
133 #[cfg(unix)]
134 use super::super::filter::CursorPositionFilter;
135 use super::{
136 super::{filter::InternalEventFilter, Event},
137 EventSource, InternalEvent, InternalEventReader,
138 };
139
140 #[test]
141 fn test_poll_fails_without_event_source() {
142 let mut reader = InternalEventReader {
143 events: VecDeque::new(),
144 source: None,
145 skipped_events: Vec::with_capacity(32),
146 };
147
148 assert!(reader.poll(None, &InternalEventFilter).is_err());
149 assert!(reader
150 .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
151 .is_err());
152 assert!(reader
153 .poll(Some(Duration::from_secs(10)), &InternalEventFilter)
154 .is_err());
155 }
156
157 #[test]
158 fn test_poll_returns_true_for_matching_event_in_queue_at_front() {
159 let mut reader = InternalEventReader {
160 events: vec![InternalEvent::Event(Event::Resize(10, 10))].into(),
161 source: None,
162 skipped_events: Vec::with_capacity(32),
163 };
164
165 assert!(reader.poll(None, &InternalEventFilter).unwrap());
166 }
167
168 #[test]
169 #[cfg(unix)]
170 fn test_poll_returns_true_for_matching_event_in_queue_at_back() {
171 let mut reader = InternalEventReader {
172 events: vec![
173 InternalEvent::Event(Event::Resize(10, 10)),
174 InternalEvent::CursorPosition(10, 20),
175 ]
176 .into(),
177 source: None,
178 skipped_events: Vec::with_capacity(32),
179 };
180
181 assert!(reader.poll(None, &CursorPositionFilter).unwrap());
182 }
183
184 #[test]
185 fn test_read_returns_matching_event_in_queue_at_front() {
186 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
187
188 let mut reader = InternalEventReader {
189 events: vec![EVENT].into(),
190 source: None,
191 skipped_events: Vec::with_capacity(32),
192 };
193
194 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
195 }
196
197 #[test]
198 #[cfg(unix)]
199 fn test_read_returns_matching_event_in_queue_at_back() {
200 const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20);
201
202 let mut reader = InternalEventReader {
203 events: vec![InternalEvent::Event(Event::Resize(10, 10)), CURSOR_EVENT].into(),
204 source: None,
205 skipped_events: Vec::with_capacity(32),
206 };
207
208 assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT);
209 }
210
211 #[test]
212 #[cfg(unix)]
213 fn test_read_does_not_consume_skipped_event() {
214 const SKIPPED_EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
215 const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20);
216
217 let mut reader = InternalEventReader {
218 events: vec![SKIPPED_EVENT, CURSOR_EVENT].into(),
219 source: None,
220 skipped_events: Vec::with_capacity(32),
221 };
222
223 assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT);
224 assert_eq!(reader.read(&InternalEventFilter).unwrap(), SKIPPED_EVENT);
225 }
226
227 #[test]
228 fn test_poll_timeouts_if_source_has_no_events() {
229 let source = FakeSource::default();
230
231 let mut reader = InternalEventReader {
232 events: VecDeque::new(),
233 source: Some(Box::new(source)),
234 skipped_events: Vec::with_capacity(32),
235 };
236
237 assert!(!reader
238 .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
239 .unwrap());
240 }
241
242 #[test]
243 fn test_poll_returns_true_if_source_has_at_least_one_event() {
244 let source = FakeSource::with_events(&[InternalEvent::Event(Event::Resize(10, 10))]);
245
246 let mut reader = InternalEventReader {
247 events: VecDeque::new(),
248 source: Some(Box::new(source)),
249 skipped_events: Vec::with_capacity(32),
250 };
251
252 assert!(reader.poll(None, &InternalEventFilter).unwrap());
253 assert!(reader
254 .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
255 .unwrap());
256 }
257
258 #[test]
259 fn test_reads_returns_event_if_source_has_at_least_one_event() {
260 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
261
262 let source = FakeSource::with_events(&[EVENT]);
263
264 let mut reader = InternalEventReader {
265 events: VecDeque::new(),
266 source: Some(Box::new(source)),
267 skipped_events: Vec::with_capacity(32),
268 };
269
270 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
271 }
272
273 #[test]
274 fn test_read_returns_events_if_source_has_events() {
275 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
276
277 let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]);
278
279 let mut reader = InternalEventReader {
280 events: VecDeque::new(),
281 source: Some(Box::new(source)),
282 skipped_events: Vec::with_capacity(32),
283 };
284
285 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
286 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
287 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
288 }
289
290 #[test]
291 fn test_poll_returns_false_after_all_source_events_are_consumed() {
292 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
293
294 let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]);
295
296 let mut reader = InternalEventReader {
297 events: VecDeque::new(),
298 source: Some(Box::new(source)),
299 skipped_events: Vec::with_capacity(32),
300 };
301
302 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
303 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
304 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
305 assert!(!reader
306 .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
307 .unwrap());
308 }
309
310 #[test]
311 fn test_poll_propagates_error() {
312 let mut reader = InternalEventReader {
313 events: VecDeque::new(),
314 source: Some(Box::new(FakeSource::new(&[]))),
315 skipped_events: Vec::with_capacity(32),
316 };
317
318 assert_eq!(
319 reader
320 .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
321 .err()
322 .map(|e| format!("{:?}", &e.kind())),
323 Some(format!("{:?}", io::ErrorKind::Other))
324 );
325 }
326
327 #[test]
328 fn test_read_propagates_error() {
329 let mut reader = InternalEventReader {
330 events: VecDeque::new(),
331 source: Some(Box::new(FakeSource::new(&[]))),
332 skipped_events: Vec::with_capacity(32),
333 };
334
335 assert_eq!(
336 reader
337 .read(&InternalEventFilter)
338 .err()
339 .map(|e| format!("{:?}", &e.kind())),
340 Some(format!("{:?}", io::ErrorKind::Other))
341 );
342 }
343
344 #[test]
345 fn test_poll_continues_after_error() {
346 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
347
348 let source = FakeSource::new(&[EVENT, EVENT]);
349
350 let mut reader = InternalEventReader {
351 events: VecDeque::new(),
352 source: Some(Box::new(source)),
353 skipped_events: Vec::with_capacity(32),
354 };
355
356 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
357 assert!(reader.read(&InternalEventFilter).is_err());
358 assert!(reader
359 .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
360 .unwrap());
361 }
362
363 #[test]
364 fn test_read_continues_after_error() {
365 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
366
367 let source = FakeSource::new(&[EVENT, EVENT]);
368
369 let mut reader = InternalEventReader {
370 events: VecDeque::new(),
371 source: Some(Box::new(source)),
372 skipped_events: Vec::with_capacity(32),
373 };
374
375 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
376 assert!(reader.read(&InternalEventFilter).is_err());
377 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
378 }
379
380 #[derive(Default)]
381 struct FakeSource {
382 events: VecDeque<InternalEvent>,
383 error: Option<io::Error>,
384 }
385
386 impl FakeSource {
387 fn new(events: &[InternalEvent]) -> FakeSource {
388 FakeSource {
389 events: events.to_vec().into(),
390 error: Some(io::Error::new(io::ErrorKind::Other, "")),
391 }
392 }
393
394 fn with_events(events: &[InternalEvent]) -> FakeSource {
395 FakeSource {
396 events: events.to_vec().into(),
397 error: None,
398 }
399 }
400 }
401
402 impl EventSource for FakeSource {
403 fn try_read(&mut self, _timeout: Option<Duration>) -> io::Result<Option<InternalEvent>> {
404 // Return error if set in case there's just one remaining event
405 if self.events.len() == 1 {
406 if let Some(error) = self.error.take() {
407 return Err(error);
408 }
409 }
410
411 // Return all events from the queue
412 if let Some(event) = self.events.pop_front() {
413 return Ok(Some(event));
414 }
415
416 // Return error if there're no more events
417 if let Some(error) = self.error.take() {
418 return Err(error);
419 }
420
421 // Timeout
422 Ok(None)
423 }
424
425 #[cfg(feature = "event-stream")]
426 fn waker(&self) -> super::super::sys::Waker {
427 unimplemented!();
428 }
429 }
430}
431

Provided by KDAB

Privacy Policy