1 | use std::{collections::vec_deque::VecDeque, io, time::Duration};
|
2 |
|
3 | #[cfg (unix)]
|
4 | use crate::event::source::unix::UnixInternalEventSource;
|
5 | #[cfg (windows)]
|
6 | use crate::event::source::windows::WindowsEventSource;
|
7 | #[cfg (feature = "event-stream" )]
|
8 | use crate::event::sys::Waker;
|
9 | use crate::event::{filter::Filter, source::EventSource, timeout::PollTimeout, InternalEvent};
|
10 |
|
11 | /// Can be used to read `InternalEvent`s.
|
12 | pub(crate) struct InternalEventReader {
|
13 | events: VecDeque<InternalEvent>,
|
14 | source: Option<Box<dyn EventSource>>,
|
15 | skipped_events: Vec<InternalEvent>,
|
16 | }
|
17 |
|
18 | impl Default for InternalEventReader {
|
19 | fn default() -> Self {
|
20 | #[cfg (windows)]
|
21 | let source = WindowsEventSource::new();
|
22 | #[cfg (unix)]
|
23 | let source: Result = UnixInternalEventSource::new();
|
24 |
|
25 | let source: Option> = source.ok().map(|x: UnixInternalEventSource| Box::new(x) as Box<dyn EventSource>);
|
26 |
|
27 | InternalEventReader {
|
28 | source,
|
29 | events: VecDeque::with_capacity(32),
|
30 | skipped_events: Vec::with_capacity(32),
|
31 | }
|
32 | }
|
33 | }
|
34 |
|
35 | impl InternalEventReader {
|
36 | /// Returns a `Waker` allowing to wake/force the `poll` method to return `Ok(false)`.
|
37 | #[cfg (feature = "event-stream" )]
|
38 | pub(crate) fn waker(&self) -> Waker {
|
39 | self.source.as_ref().expect("reader source not set" ).waker()
|
40 | }
|
41 |
|
42 | pub(crate) fn poll<F>(&mut self, timeout: Option<Duration>, filter: &F) -> io::Result<bool>
|
43 | where
|
44 | F: Filter,
|
45 | {
|
46 | for event in &self.events {
|
47 | if filter.eval(event) {
|
48 | return Ok(true);
|
49 | }
|
50 | }
|
51 |
|
52 | let event_source = match self.source.as_mut() {
|
53 | Some(source) => source,
|
54 | None => {
|
55 | return Err(std::io::Error::new(
|
56 | std::io::ErrorKind::Other,
|
57 | "Failed to initialize input reader" ,
|
58 | ))
|
59 | }
|
60 | };
|
61 |
|
62 | let poll_timeout = PollTimeout::new(timeout);
|
63 |
|
64 | loop {
|
65 | let maybe_event = match event_source.try_read(poll_timeout.leftover()) {
|
66 | Ok(None) => None,
|
67 | Ok(Some(event)) => {
|
68 | if filter.eval(&event) {
|
69 | Some(event)
|
70 | } else {
|
71 | self.skipped_events.push(event);
|
72 | None
|
73 | }
|
74 | }
|
75 | Err(e) => {
|
76 | if e.kind() == io::ErrorKind::Interrupted {
|
77 | return Ok(false);
|
78 | }
|
79 |
|
80 | return Err(e);
|
81 | }
|
82 | };
|
83 |
|
84 | if poll_timeout.elapsed() || maybe_event.is_some() {
|
85 | self.events.extend(self.skipped_events.drain(..));
|
86 |
|
87 | if let Some(event) = maybe_event {
|
88 | self.events.push_front(event);
|
89 | return Ok(true);
|
90 | }
|
91 |
|
92 | return Ok(false);
|
93 | }
|
94 | }
|
95 | }
|
96 |
|
97 | pub(crate) fn read<F>(&mut self, filter: &F) -> io::Result<InternalEvent>
|
98 | where
|
99 | F: Filter,
|
100 | {
|
101 | let mut skipped_events = VecDeque::new();
|
102 |
|
103 | loop {
|
104 | while let Some(event) = self.events.pop_front() {
|
105 | if filter.eval(&event) {
|
106 | while let Some(event) = skipped_events.pop_front() {
|
107 | self.events.push_back(event);
|
108 | }
|
109 |
|
110 | return Ok(event);
|
111 | } else {
|
112 | // We can not directly write events back to `self.events`.
|
113 | // If we did, we would put our self's into an endless loop
|
114 | // that would enqueue -> dequeue -> enqueue etc.
|
115 | // This happens because `poll` in this function will always return true if there are events in it's.
|
116 | // And because we just put the non-fulfilling event there this is going to be the case.
|
117 | // Instead we can store them into the temporary buffer,
|
118 | // and then when the filter is fulfilled write all events back in order.
|
119 | skipped_events.push_back(event);
|
120 | }
|
121 | }
|
122 |
|
123 | let _ = self.poll(None, filter)?;
|
124 | }
|
125 | }
|
126 | }
|
127 |
|
128 | #[cfg (test)]
|
129 | mod tests {
|
130 | use std::io;
|
131 | use std::{collections::VecDeque, time::Duration};
|
132 |
|
133 | #[cfg (unix)]
|
134 | use super::super::filter::CursorPositionFilter;
|
135 | use super::{super::Event, EventSource, Filter, InternalEvent, InternalEventReader};
|
136 |
|
137 | #[derive (Debug, Clone)]
|
138 | pub(crate) struct InternalEventFilter;
|
139 |
|
140 | impl Filter for InternalEventFilter {
|
141 | fn eval(&self, _: &InternalEvent) -> bool {
|
142 | true
|
143 | }
|
144 | }
|
145 |
|
146 | #[test ]
|
147 | fn test_poll_fails_without_event_source() {
|
148 | let mut reader = InternalEventReader {
|
149 | events: VecDeque::new(),
|
150 | source: None,
|
151 | skipped_events: Vec::with_capacity(32),
|
152 | };
|
153 |
|
154 | assert!(reader.poll(None, &InternalEventFilter).is_err());
|
155 | assert!(reader
|
156 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
|
157 | .is_err());
|
158 | assert!(reader
|
159 | .poll(Some(Duration::from_secs(10)), &InternalEventFilter)
|
160 | .is_err());
|
161 | }
|
162 |
|
163 | #[test ]
|
164 | fn test_poll_returns_true_for_matching_event_in_queue_at_front() {
|
165 | let mut reader = InternalEventReader {
|
166 | events: vec![InternalEvent::Event(Event::Resize(10, 10))].into(),
|
167 | source: None,
|
168 | skipped_events: Vec::with_capacity(32),
|
169 | };
|
170 |
|
171 | assert!(reader.poll(None, &InternalEventFilter).unwrap());
|
172 | }
|
173 |
|
174 | #[test ]
|
175 | #[cfg (unix)]
|
176 | fn test_poll_returns_true_for_matching_event_in_queue_at_back() {
|
177 | let mut reader = InternalEventReader {
|
178 | events: vec![
|
179 | InternalEvent::Event(Event::Resize(10, 10)),
|
180 | InternalEvent::CursorPosition(10, 20),
|
181 | ]
|
182 | .into(),
|
183 | source: None,
|
184 | skipped_events: Vec::with_capacity(32),
|
185 | };
|
186 |
|
187 | assert!(reader.poll(None, &CursorPositionFilter).unwrap());
|
188 | }
|
189 |
|
190 | #[test ]
|
191 | fn test_read_returns_matching_event_in_queue_at_front() {
|
192 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
193 |
|
194 | let mut reader = InternalEventReader {
|
195 | events: vec![EVENT].into(),
|
196 | source: None,
|
197 | skipped_events: Vec::with_capacity(32),
|
198 | };
|
199 |
|
200 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
201 | }
|
202 |
|
203 | #[test ]
|
204 | #[cfg (unix)]
|
205 | fn test_read_returns_matching_event_in_queue_at_back() {
|
206 | const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20);
|
207 |
|
208 | let mut reader = InternalEventReader {
|
209 | events: vec![InternalEvent::Event(Event::Resize(10, 10)), CURSOR_EVENT].into(),
|
210 | source: None,
|
211 | skipped_events: Vec::with_capacity(32),
|
212 | };
|
213 |
|
214 | assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT);
|
215 | }
|
216 |
|
217 | #[test ]
|
218 | #[cfg (unix)]
|
219 | fn test_read_does_not_consume_skipped_event() {
|
220 | const SKIPPED_EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
221 | const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20);
|
222 |
|
223 | let mut reader = InternalEventReader {
|
224 | events: vec![SKIPPED_EVENT, CURSOR_EVENT].into(),
|
225 | source: None,
|
226 | skipped_events: Vec::with_capacity(32),
|
227 | };
|
228 |
|
229 | assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT);
|
230 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), SKIPPED_EVENT);
|
231 | }
|
232 |
|
233 | #[test ]
|
234 | fn test_poll_timeouts_if_source_has_no_events() {
|
235 | let source = FakeSource::default();
|
236 |
|
237 | let mut reader = InternalEventReader {
|
238 | events: VecDeque::new(),
|
239 | source: Some(Box::new(source)),
|
240 | skipped_events: Vec::with_capacity(32),
|
241 | };
|
242 |
|
243 | assert!(!reader
|
244 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
|
245 | .unwrap());
|
246 | }
|
247 |
|
248 | #[test ]
|
249 | fn test_poll_returns_true_if_source_has_at_least_one_event() {
|
250 | let source = FakeSource::with_events(&[InternalEvent::Event(Event::Resize(10, 10))]);
|
251 |
|
252 | let mut reader = InternalEventReader {
|
253 | events: VecDeque::new(),
|
254 | source: Some(Box::new(source)),
|
255 | skipped_events: Vec::with_capacity(32),
|
256 | };
|
257 |
|
258 | assert!(reader.poll(None, &InternalEventFilter).unwrap());
|
259 | assert!(reader
|
260 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
|
261 | .unwrap());
|
262 | }
|
263 |
|
264 | #[test ]
|
265 | fn test_reads_returns_event_if_source_has_at_least_one_event() {
|
266 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
267 |
|
268 | let source = FakeSource::with_events(&[EVENT]);
|
269 |
|
270 | let mut reader = InternalEventReader {
|
271 | events: VecDeque::new(),
|
272 | source: Some(Box::new(source)),
|
273 | skipped_events: Vec::with_capacity(32),
|
274 | };
|
275 |
|
276 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
277 | }
|
278 |
|
279 | #[test ]
|
280 | fn test_read_returns_events_if_source_has_events() {
|
281 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
282 |
|
283 | let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]);
|
284 |
|
285 | let mut reader = InternalEventReader {
|
286 | events: VecDeque::new(),
|
287 | source: Some(Box::new(source)),
|
288 | skipped_events: Vec::with_capacity(32),
|
289 | };
|
290 |
|
291 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
292 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
293 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
294 | }
|
295 |
|
296 | #[test ]
|
297 | fn test_poll_returns_false_after_all_source_events_are_consumed() {
|
298 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
299 |
|
300 | let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]);
|
301 |
|
302 | let mut reader = InternalEventReader {
|
303 | events: VecDeque::new(),
|
304 | source: Some(Box::new(source)),
|
305 | skipped_events: Vec::with_capacity(32),
|
306 | };
|
307 |
|
308 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
309 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
310 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
311 | assert!(!reader
|
312 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
|
313 | .unwrap());
|
314 | }
|
315 |
|
316 | #[test ]
|
317 | fn test_poll_propagates_error() {
|
318 | let mut reader = InternalEventReader {
|
319 | events: VecDeque::new(),
|
320 | source: Some(Box::new(FakeSource::new(&[]))),
|
321 | skipped_events: Vec::with_capacity(32),
|
322 | };
|
323 |
|
324 | assert_eq!(
|
325 | reader
|
326 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
|
327 | .err()
|
328 | .map(|e| format!("{:?}" , &e.kind())),
|
329 | Some(format!("{:?}" , io::ErrorKind::Other))
|
330 | );
|
331 | }
|
332 |
|
333 | #[test ]
|
334 | fn test_read_propagates_error() {
|
335 | let mut reader = InternalEventReader {
|
336 | events: VecDeque::new(),
|
337 | source: Some(Box::new(FakeSource::new(&[]))),
|
338 | skipped_events: Vec::with_capacity(32),
|
339 | };
|
340 |
|
341 | assert_eq!(
|
342 | reader
|
343 | .read(&InternalEventFilter)
|
344 | .err()
|
345 | .map(|e| format!("{:?}" , &e.kind())),
|
346 | Some(format!("{:?}" , io::ErrorKind::Other))
|
347 | );
|
348 | }
|
349 |
|
350 | #[test ]
|
351 | fn test_poll_continues_after_error() {
|
352 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
353 |
|
354 | let source = FakeSource::new(&[EVENT, EVENT]);
|
355 |
|
356 | let mut reader = InternalEventReader {
|
357 | events: VecDeque::new(),
|
358 | source: Some(Box::new(source)),
|
359 | skipped_events: Vec::with_capacity(32),
|
360 | };
|
361 |
|
362 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
363 | assert!(reader.read(&InternalEventFilter).is_err());
|
364 | assert!(reader
|
365 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
|
366 | .unwrap());
|
367 | }
|
368 |
|
369 | #[test ]
|
370 | fn test_read_continues_after_error() {
|
371 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
372 |
|
373 | let source = FakeSource::new(&[EVENT, EVENT]);
|
374 |
|
375 | let mut reader = InternalEventReader {
|
376 | events: VecDeque::new(),
|
377 | source: Some(Box::new(source)),
|
378 | skipped_events: Vec::with_capacity(32),
|
379 | };
|
380 |
|
381 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
382 | assert!(reader.read(&InternalEventFilter).is_err());
|
383 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
384 | }
|
385 |
|
386 | #[derive (Default)]
|
387 | struct FakeSource {
|
388 | events: VecDeque<InternalEvent>,
|
389 | error: Option<io::Error>,
|
390 | }
|
391 |
|
392 | impl FakeSource {
|
393 | fn new(events: &[InternalEvent]) -> FakeSource {
|
394 | FakeSource {
|
395 | events: events.to_vec().into(),
|
396 | error: Some(io::Error::new(io::ErrorKind::Other, "" )),
|
397 | }
|
398 | }
|
399 |
|
400 | fn with_events(events: &[InternalEvent]) -> FakeSource {
|
401 | FakeSource {
|
402 | events: events.to_vec().into(),
|
403 | error: None,
|
404 | }
|
405 | }
|
406 | }
|
407 |
|
408 | impl EventSource for FakeSource {
|
409 | fn try_read(&mut self, _timeout: Option<Duration>) -> io::Result<Option<InternalEvent>> {
|
410 | // Return error if set in case there's just one remaining event
|
411 | if self.events.len() == 1 {
|
412 | if let Some(error) = self.error.take() {
|
413 | return Err(error);
|
414 | }
|
415 | }
|
416 |
|
417 | // Return all events from the queue
|
418 | if let Some(event) = self.events.pop_front() {
|
419 | return Ok(Some(event));
|
420 | }
|
421 |
|
422 | // Return error if there're no more events
|
423 | if let Some(error) = self.error.take() {
|
424 | return Err(error);
|
425 | }
|
426 |
|
427 | // Timeout
|
428 | Ok(None)
|
429 | }
|
430 |
|
431 | #[cfg (feature = "event-stream" )]
|
432 | fn waker(&self) -> super::super::sys::Waker {
|
433 | unimplemented!();
|
434 | }
|
435 | }
|
436 | }
|
437 | |