1use std::{collections::vec_deque::VecDeque, io, time::Duration};
2
3#[cfg(unix)]
4use crate::event::source::unix::UnixInternalEventSource;
5#[cfg(windows)]
6use crate::event::source::windows::WindowsEventSource;
7#[cfg(feature = "event-stream")]
8use crate::event::sys::Waker;
9use crate::event::{filter::Filter, source::EventSource, timeout::PollTimeout, InternalEvent};
10
11/// Can be used to read `InternalEvent`s.
12pub(crate) struct InternalEventReader {
13 events: VecDeque<InternalEvent>,
14 source: Option<Box<dyn EventSource>>,
15 skipped_events: Vec<InternalEvent>,
16}
17
18impl Default for InternalEventReader {
19 fn default() -> Self {
20 #[cfg(windows)]
21 let source = WindowsEventSource::new();
22 #[cfg(unix)]
23 let source: Result = UnixInternalEventSource::new();
24
25 let source: Option> = source.ok().map(|x: UnixInternalEventSource| Box::new(x) as Box<dyn EventSource>);
26
27 InternalEventReader {
28 source,
29 events: VecDeque::with_capacity(32),
30 skipped_events: Vec::with_capacity(32),
31 }
32 }
33}
34
35impl InternalEventReader {
36 /// Returns a `Waker` allowing to wake/force the `poll` method to return `Ok(false)`.
37 #[cfg(feature = "event-stream")]
38 pub(crate) fn waker(&self) -> Waker {
39 self.source.as_ref().expect("reader source not set").waker()
40 }
41
42 pub(crate) fn poll<F>(&mut self, timeout: Option<Duration>, filter: &F) -> io::Result<bool>
43 where
44 F: Filter,
45 {
46 for event in &self.events {
47 if filter.eval(event) {
48 return Ok(true);
49 }
50 }
51
52 let event_source = match self.source.as_mut() {
53 Some(source) => source,
54 None => {
55 return Err(std::io::Error::new(
56 std::io::ErrorKind::Other,
57 "Failed to initialize input reader",
58 ))
59 }
60 };
61
62 let poll_timeout = PollTimeout::new(timeout);
63
64 loop {
65 let maybe_event = match event_source.try_read(poll_timeout.leftover()) {
66 Ok(None) => None,
67 Ok(Some(event)) => {
68 if filter.eval(&event) {
69 Some(event)
70 } else {
71 self.skipped_events.push(event);
72 None
73 }
74 }
75 Err(e) => {
76 if e.kind() == io::ErrorKind::Interrupted {
77 return Ok(false);
78 }
79
80 return Err(e);
81 }
82 };
83
84 if poll_timeout.elapsed() || maybe_event.is_some() {
85 self.events.extend(self.skipped_events.drain(..));
86
87 if let Some(event) = maybe_event {
88 self.events.push_front(event);
89 return Ok(true);
90 }
91
92 return Ok(false);
93 }
94 }
95 }
96
97 pub(crate) fn read<F>(&mut self, filter: &F) -> io::Result<InternalEvent>
98 where
99 F: Filter,
100 {
101 let mut skipped_events = VecDeque::new();
102
103 loop {
104 while let Some(event) = self.events.pop_front() {
105 if filter.eval(&event) {
106 while let Some(event) = skipped_events.pop_front() {
107 self.events.push_back(event);
108 }
109
110 return Ok(event);
111 } else {
112 // We can not directly write events back to `self.events`.
113 // If we did, we would put our self's into an endless loop
114 // that would enqueue -> dequeue -> enqueue etc.
115 // This happens because `poll` in this function will always return true if there are events in it's.
116 // And because we just put the non-fulfilling event there this is going to be the case.
117 // Instead we can store them into the temporary buffer,
118 // and then when the filter is fulfilled write all events back in order.
119 skipped_events.push_back(event);
120 }
121 }
122
123 let _ = self.poll(None, filter)?;
124 }
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use std::io;
131 use std::{collections::VecDeque, time::Duration};
132
133 #[cfg(unix)]
134 use super::super::filter::CursorPositionFilter;
135 use super::{super::Event, EventSource, Filter, InternalEvent, InternalEventReader};
136
137 #[derive(Debug, Clone)]
138 pub(crate) struct InternalEventFilter;
139
140 impl Filter for InternalEventFilter {
141 fn eval(&self, _: &InternalEvent) -> bool {
142 true
143 }
144 }
145
146 #[test]
147 fn test_poll_fails_without_event_source() {
148 let mut reader = InternalEventReader {
149 events: VecDeque::new(),
150 source: None,
151 skipped_events: Vec::with_capacity(32),
152 };
153
154 assert!(reader.poll(None, &InternalEventFilter).is_err());
155 assert!(reader
156 .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
157 .is_err());
158 assert!(reader
159 .poll(Some(Duration::from_secs(10)), &InternalEventFilter)
160 .is_err());
161 }
162
163 #[test]
164 fn test_poll_returns_true_for_matching_event_in_queue_at_front() {
165 let mut reader = InternalEventReader {
166 events: vec![InternalEvent::Event(Event::Resize(10, 10))].into(),
167 source: None,
168 skipped_events: Vec::with_capacity(32),
169 };
170
171 assert!(reader.poll(None, &InternalEventFilter).unwrap());
172 }
173
174 #[test]
175 #[cfg(unix)]
176 fn test_poll_returns_true_for_matching_event_in_queue_at_back() {
177 let mut reader = InternalEventReader {
178 events: vec![
179 InternalEvent::Event(Event::Resize(10, 10)),
180 InternalEvent::CursorPosition(10, 20),
181 ]
182 .into(),
183 source: None,
184 skipped_events: Vec::with_capacity(32),
185 };
186
187 assert!(reader.poll(None, &CursorPositionFilter).unwrap());
188 }
189
190 #[test]
191 fn test_read_returns_matching_event_in_queue_at_front() {
192 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
193
194 let mut reader = InternalEventReader {
195 events: vec![EVENT].into(),
196 source: None,
197 skipped_events: Vec::with_capacity(32),
198 };
199
200 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
201 }
202
203 #[test]
204 #[cfg(unix)]
205 fn test_read_returns_matching_event_in_queue_at_back() {
206 const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20);
207
208 let mut reader = InternalEventReader {
209 events: vec![InternalEvent::Event(Event::Resize(10, 10)), CURSOR_EVENT].into(),
210 source: None,
211 skipped_events: Vec::with_capacity(32),
212 };
213
214 assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT);
215 }
216
217 #[test]
218 #[cfg(unix)]
219 fn test_read_does_not_consume_skipped_event() {
220 const SKIPPED_EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
221 const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20);
222
223 let mut reader = InternalEventReader {
224 events: vec![SKIPPED_EVENT, CURSOR_EVENT].into(),
225 source: None,
226 skipped_events: Vec::with_capacity(32),
227 };
228
229 assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT);
230 assert_eq!(reader.read(&InternalEventFilter).unwrap(), SKIPPED_EVENT);
231 }
232
233 #[test]
234 fn test_poll_timeouts_if_source_has_no_events() {
235 let source = FakeSource::default();
236
237 let mut reader = InternalEventReader {
238 events: VecDeque::new(),
239 source: Some(Box::new(source)),
240 skipped_events: Vec::with_capacity(32),
241 };
242
243 assert!(!reader
244 .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
245 .unwrap());
246 }
247
248 #[test]
249 fn test_poll_returns_true_if_source_has_at_least_one_event() {
250 let source = FakeSource::with_events(&[InternalEvent::Event(Event::Resize(10, 10))]);
251
252 let mut reader = InternalEventReader {
253 events: VecDeque::new(),
254 source: Some(Box::new(source)),
255 skipped_events: Vec::with_capacity(32),
256 };
257
258 assert!(reader.poll(None, &InternalEventFilter).unwrap());
259 assert!(reader
260 .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
261 .unwrap());
262 }
263
264 #[test]
265 fn test_reads_returns_event_if_source_has_at_least_one_event() {
266 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
267
268 let source = FakeSource::with_events(&[EVENT]);
269
270 let mut reader = InternalEventReader {
271 events: VecDeque::new(),
272 source: Some(Box::new(source)),
273 skipped_events: Vec::with_capacity(32),
274 };
275
276 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
277 }
278
279 #[test]
280 fn test_read_returns_events_if_source_has_events() {
281 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
282
283 let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]);
284
285 let mut reader = InternalEventReader {
286 events: VecDeque::new(),
287 source: Some(Box::new(source)),
288 skipped_events: Vec::with_capacity(32),
289 };
290
291 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
292 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
293 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
294 }
295
296 #[test]
297 fn test_poll_returns_false_after_all_source_events_are_consumed() {
298 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
299
300 let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]);
301
302 let mut reader = InternalEventReader {
303 events: VecDeque::new(),
304 source: Some(Box::new(source)),
305 skipped_events: Vec::with_capacity(32),
306 };
307
308 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
309 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
310 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
311 assert!(!reader
312 .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
313 .unwrap());
314 }
315
316 #[test]
317 fn test_poll_propagates_error() {
318 let mut reader = InternalEventReader {
319 events: VecDeque::new(),
320 source: Some(Box::new(FakeSource::new(&[]))),
321 skipped_events: Vec::with_capacity(32),
322 };
323
324 assert_eq!(
325 reader
326 .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
327 .err()
328 .map(|e| format!("{:?}", &e.kind())),
329 Some(format!("{:?}", io::ErrorKind::Other))
330 );
331 }
332
333 #[test]
334 fn test_read_propagates_error() {
335 let mut reader = InternalEventReader {
336 events: VecDeque::new(),
337 source: Some(Box::new(FakeSource::new(&[]))),
338 skipped_events: Vec::with_capacity(32),
339 };
340
341 assert_eq!(
342 reader
343 .read(&InternalEventFilter)
344 .err()
345 .map(|e| format!("{:?}", &e.kind())),
346 Some(format!("{:?}", io::ErrorKind::Other))
347 );
348 }
349
350 #[test]
351 fn test_poll_continues_after_error() {
352 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
353
354 let source = FakeSource::new(&[EVENT, EVENT]);
355
356 let mut reader = InternalEventReader {
357 events: VecDeque::new(),
358 source: Some(Box::new(source)),
359 skipped_events: Vec::with_capacity(32),
360 };
361
362 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
363 assert!(reader.read(&InternalEventFilter).is_err());
364 assert!(reader
365 .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
366 .unwrap());
367 }
368
369 #[test]
370 fn test_read_continues_after_error() {
371 const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
372
373 let source = FakeSource::new(&[EVENT, EVENT]);
374
375 let mut reader = InternalEventReader {
376 events: VecDeque::new(),
377 source: Some(Box::new(source)),
378 skipped_events: Vec::with_capacity(32),
379 };
380
381 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
382 assert!(reader.read(&InternalEventFilter).is_err());
383 assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
384 }
385
386 #[derive(Default)]
387 struct FakeSource {
388 events: VecDeque<InternalEvent>,
389 error: Option<io::Error>,
390 }
391
392 impl FakeSource {
393 fn new(events: &[InternalEvent]) -> FakeSource {
394 FakeSource {
395 events: events.to_vec().into(),
396 error: Some(io::Error::new(io::ErrorKind::Other, "")),
397 }
398 }
399
400 fn with_events(events: &[InternalEvent]) -> FakeSource {
401 FakeSource {
402 events: events.to_vec().into(),
403 error: None,
404 }
405 }
406 }
407
408 impl EventSource for FakeSource {
409 fn try_read(&mut self, _timeout: Option<Duration>) -> io::Result<Option<InternalEvent>> {
410 // Return error if set in case there's just one remaining event
411 if self.events.len() == 1 {
412 if let Some(error) = self.error.take() {
413 return Err(error);
414 }
415 }
416
417 // Return all events from the queue
418 if let Some(event) = self.events.pop_front() {
419 return Ok(Some(event));
420 }
421
422 // Return error if there're no more events
423 if let Some(error) = self.error.take() {
424 return Err(error);
425 }
426
427 // Timeout
428 Ok(None)
429 }
430
431 #[cfg(feature = "event-stream")]
432 fn waker(&self) -> super::super::sys::Waker {
433 unimplemented!();
434 }
435 }
436}
437