1 | use std::{collections::vec_deque::VecDeque, io, time::Duration};
|
2 |
|
3 | #[cfg (unix)]
|
4 | use crate::event::source::unix::UnixInternalEventSource;
|
5 | #[cfg (windows)]
|
6 | use crate::event::source::windows::WindowsEventSource;
|
7 | #[cfg (feature = "event-stream" )]
|
8 | use crate::event::sys::Waker;
|
9 | use crate::event::{filter::Filter, source::EventSource, timeout::PollTimeout, InternalEvent};
|
10 |
|
11 | /// Can be used to read `InternalEvent`s.
|
12 | pub(crate) struct InternalEventReader {
|
13 | events: VecDeque<InternalEvent>,
|
14 | source: Option<Box<dyn EventSource>>,
|
15 | skipped_events: Vec<InternalEvent>,
|
16 | }
|
17 |
|
18 | impl Default for InternalEventReader {
|
19 | fn default() -> Self {
|
20 | #[cfg (windows)]
|
21 | let source = WindowsEventSource::new();
|
22 | #[cfg (unix)]
|
23 | let source: Result = UnixInternalEventSource::new();
|
24 |
|
25 | let source: Option> = source.ok().map(|x: UnixInternalEventSource| Box::new(x) as Box<dyn EventSource>);
|
26 |
|
27 | InternalEventReader {
|
28 | source,
|
29 | events: VecDeque::with_capacity(32),
|
30 | skipped_events: Vec::with_capacity(32),
|
31 | }
|
32 | }
|
33 | }
|
34 |
|
35 | impl InternalEventReader {
|
36 | /// Returns a `Waker` allowing to wake/force the `poll` method to return `Ok(false)`.
|
37 | #[cfg (feature = "event-stream" )]
|
38 | pub(crate) fn waker(&self) -> Waker {
|
39 | self.source.as_ref().expect("reader source not set" ).waker()
|
40 | }
|
41 |
|
42 | pub(crate) fn poll<F>(&mut self, timeout: Option<Duration>, filter: &F) -> io::Result<bool>
|
43 | where
|
44 | F: Filter,
|
45 | {
|
46 | for event in &self.events {
|
47 | if filter.eval(event) {
|
48 | return Ok(true);
|
49 | }
|
50 | }
|
51 |
|
52 | let event_source = match self.source.as_mut() {
|
53 | Some(source) => source,
|
54 | None => {
|
55 | return Err(std::io::Error::new(
|
56 | std::io::ErrorKind::Other,
|
57 | "Failed to initialize input reader" ,
|
58 | ))
|
59 | }
|
60 | };
|
61 |
|
62 | let poll_timeout = PollTimeout::new(timeout);
|
63 |
|
64 | loop {
|
65 | let maybe_event = match event_source.try_read(poll_timeout.leftover()) {
|
66 | Ok(None) => None,
|
67 | Ok(Some(event)) => {
|
68 | if filter.eval(&event) {
|
69 | Some(event)
|
70 | } else {
|
71 | self.skipped_events.push(event);
|
72 | None
|
73 | }
|
74 | }
|
75 | Err(e) => {
|
76 | if e.kind() == io::ErrorKind::Interrupted {
|
77 | return Ok(false);
|
78 | }
|
79 |
|
80 | return Err(e);
|
81 | }
|
82 | };
|
83 |
|
84 | if poll_timeout.elapsed() || maybe_event.is_some() {
|
85 | self.events.extend(self.skipped_events.drain(..));
|
86 |
|
87 | if let Some(event) = maybe_event {
|
88 | self.events.push_front(event);
|
89 | return Ok(true);
|
90 | }
|
91 |
|
92 | return Ok(false);
|
93 | }
|
94 | }
|
95 | }
|
96 |
|
97 | pub(crate) fn read<F>(&mut self, filter: &F) -> io::Result<InternalEvent>
|
98 | where
|
99 | F: Filter,
|
100 | {
|
101 | let mut skipped_events = VecDeque::new();
|
102 |
|
103 | loop {
|
104 | while let Some(event) = self.events.pop_front() {
|
105 | if filter.eval(&event) {
|
106 | while let Some(event) = skipped_events.pop_front() {
|
107 | self.events.push_back(event);
|
108 | }
|
109 |
|
110 | return Ok(event);
|
111 | } else {
|
112 | // We can not directly write events back to `self.events`.
|
113 | // If we did, we would put our self's into an endless loop
|
114 | // that would enqueue -> dequeue -> enqueue etc.
|
115 | // This happens because `poll` in this function will always return true if there are events in it's.
|
116 | // And because we just put the non-fulfilling event there this is going to be the case.
|
117 | // Instead we can store them into the temporary buffer,
|
118 | // and then when the filter is fulfilled write all events back in order.
|
119 | skipped_events.push_back(event);
|
120 | }
|
121 | }
|
122 |
|
123 | let _ = self.poll(None, filter)?;
|
124 | }
|
125 | }
|
126 | }
|
127 |
|
128 | #[cfg (test)]
|
129 | mod tests {
|
130 | use std::io;
|
131 | use std::{collections::VecDeque, time::Duration};
|
132 |
|
133 | #[cfg (unix)]
|
134 | use super::super::filter::CursorPositionFilter;
|
135 | use super::{
|
136 | super::{filter::InternalEventFilter, Event},
|
137 | EventSource, InternalEvent, InternalEventReader,
|
138 | };
|
139 |
|
140 | #[test ]
|
141 | fn test_poll_fails_without_event_source() {
|
142 | let mut reader = InternalEventReader {
|
143 | events: VecDeque::new(),
|
144 | source: None,
|
145 | skipped_events: Vec::with_capacity(32),
|
146 | };
|
147 |
|
148 | assert!(reader.poll(None, &InternalEventFilter).is_err());
|
149 | assert!(reader
|
150 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
|
151 | .is_err());
|
152 | assert!(reader
|
153 | .poll(Some(Duration::from_secs(10)), &InternalEventFilter)
|
154 | .is_err());
|
155 | }
|
156 |
|
157 | #[test ]
|
158 | fn test_poll_returns_true_for_matching_event_in_queue_at_front() {
|
159 | let mut reader = InternalEventReader {
|
160 | events: vec![InternalEvent::Event(Event::Resize(10, 10))].into(),
|
161 | source: None,
|
162 | skipped_events: Vec::with_capacity(32),
|
163 | };
|
164 |
|
165 | assert!(reader.poll(None, &InternalEventFilter).unwrap());
|
166 | }
|
167 |
|
168 | #[test ]
|
169 | #[cfg (unix)]
|
170 | fn test_poll_returns_true_for_matching_event_in_queue_at_back() {
|
171 | let mut reader = InternalEventReader {
|
172 | events: vec![
|
173 | InternalEvent::Event(Event::Resize(10, 10)),
|
174 | InternalEvent::CursorPosition(10, 20),
|
175 | ]
|
176 | .into(),
|
177 | source: None,
|
178 | skipped_events: Vec::with_capacity(32),
|
179 | };
|
180 |
|
181 | assert!(reader.poll(None, &CursorPositionFilter).unwrap());
|
182 | }
|
183 |
|
184 | #[test ]
|
185 | fn test_read_returns_matching_event_in_queue_at_front() {
|
186 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
187 |
|
188 | let mut reader = InternalEventReader {
|
189 | events: vec![EVENT].into(),
|
190 | source: None,
|
191 | skipped_events: Vec::with_capacity(32),
|
192 | };
|
193 |
|
194 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
195 | }
|
196 |
|
197 | #[test ]
|
198 | #[cfg (unix)]
|
199 | fn test_read_returns_matching_event_in_queue_at_back() {
|
200 | const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20);
|
201 |
|
202 | let mut reader = InternalEventReader {
|
203 | events: vec![InternalEvent::Event(Event::Resize(10, 10)), CURSOR_EVENT].into(),
|
204 | source: None,
|
205 | skipped_events: Vec::with_capacity(32),
|
206 | };
|
207 |
|
208 | assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT);
|
209 | }
|
210 |
|
211 | #[test ]
|
212 | #[cfg (unix)]
|
213 | fn test_read_does_not_consume_skipped_event() {
|
214 | const SKIPPED_EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
215 | const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20);
|
216 |
|
217 | let mut reader = InternalEventReader {
|
218 | events: vec![SKIPPED_EVENT, CURSOR_EVENT].into(),
|
219 | source: None,
|
220 | skipped_events: Vec::with_capacity(32),
|
221 | };
|
222 |
|
223 | assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT);
|
224 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), SKIPPED_EVENT);
|
225 | }
|
226 |
|
227 | #[test ]
|
228 | fn test_poll_timeouts_if_source_has_no_events() {
|
229 | let source = FakeSource::default();
|
230 |
|
231 | let mut reader = InternalEventReader {
|
232 | events: VecDeque::new(),
|
233 | source: Some(Box::new(source)),
|
234 | skipped_events: Vec::with_capacity(32),
|
235 | };
|
236 |
|
237 | assert!(!reader
|
238 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
|
239 | .unwrap());
|
240 | }
|
241 |
|
242 | #[test ]
|
243 | fn test_poll_returns_true_if_source_has_at_least_one_event() {
|
244 | let source = FakeSource::with_events(&[InternalEvent::Event(Event::Resize(10, 10))]);
|
245 |
|
246 | let mut reader = InternalEventReader {
|
247 | events: VecDeque::new(),
|
248 | source: Some(Box::new(source)),
|
249 | skipped_events: Vec::with_capacity(32),
|
250 | };
|
251 |
|
252 | assert!(reader.poll(None, &InternalEventFilter).unwrap());
|
253 | assert!(reader
|
254 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
|
255 | .unwrap());
|
256 | }
|
257 |
|
258 | #[test ]
|
259 | fn test_reads_returns_event_if_source_has_at_least_one_event() {
|
260 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
261 |
|
262 | let source = FakeSource::with_events(&[EVENT]);
|
263 |
|
264 | let mut reader = InternalEventReader {
|
265 | events: VecDeque::new(),
|
266 | source: Some(Box::new(source)),
|
267 | skipped_events: Vec::with_capacity(32),
|
268 | };
|
269 |
|
270 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
271 | }
|
272 |
|
273 | #[test ]
|
274 | fn test_read_returns_events_if_source_has_events() {
|
275 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
276 |
|
277 | let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]);
|
278 |
|
279 | let mut reader = InternalEventReader {
|
280 | events: VecDeque::new(),
|
281 | source: Some(Box::new(source)),
|
282 | skipped_events: Vec::with_capacity(32),
|
283 | };
|
284 |
|
285 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
286 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
287 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
288 | }
|
289 |
|
290 | #[test ]
|
291 | fn test_poll_returns_false_after_all_source_events_are_consumed() {
|
292 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
293 |
|
294 | let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]);
|
295 |
|
296 | let mut reader = InternalEventReader {
|
297 | events: VecDeque::new(),
|
298 | source: Some(Box::new(source)),
|
299 | skipped_events: Vec::with_capacity(32),
|
300 | };
|
301 |
|
302 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
303 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
304 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
305 | assert!(!reader
|
306 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
|
307 | .unwrap());
|
308 | }
|
309 |
|
310 | #[test ]
|
311 | fn test_poll_propagates_error() {
|
312 | let mut reader = InternalEventReader {
|
313 | events: VecDeque::new(),
|
314 | source: Some(Box::new(FakeSource::new(&[]))),
|
315 | skipped_events: Vec::with_capacity(32),
|
316 | };
|
317 |
|
318 | assert_eq!(
|
319 | reader
|
320 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
|
321 | .err()
|
322 | .map(|e| format!(" {:?}" , &e.kind())),
|
323 | Some(format!(" {:?}" , io::ErrorKind::Other))
|
324 | );
|
325 | }
|
326 |
|
327 | #[test ]
|
328 | fn test_read_propagates_error() {
|
329 | let mut reader = InternalEventReader {
|
330 | events: VecDeque::new(),
|
331 | source: Some(Box::new(FakeSource::new(&[]))),
|
332 | skipped_events: Vec::with_capacity(32),
|
333 | };
|
334 |
|
335 | assert_eq!(
|
336 | reader
|
337 | .read(&InternalEventFilter)
|
338 | .err()
|
339 | .map(|e| format!(" {:?}" , &e.kind())),
|
340 | Some(format!(" {:?}" , io::ErrorKind::Other))
|
341 | );
|
342 | }
|
343 |
|
344 | #[test ]
|
345 | fn test_poll_continues_after_error() {
|
346 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
347 |
|
348 | let source = FakeSource::new(&[EVENT, EVENT]);
|
349 |
|
350 | let mut reader = InternalEventReader {
|
351 | events: VecDeque::new(),
|
352 | source: Some(Box::new(source)),
|
353 | skipped_events: Vec::with_capacity(32),
|
354 | };
|
355 |
|
356 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
357 | assert!(reader.read(&InternalEventFilter).is_err());
|
358 | assert!(reader
|
359 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
|
360 | .unwrap());
|
361 | }
|
362 |
|
363 | #[test ]
|
364 | fn test_read_continues_after_error() {
|
365 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
366 |
|
367 | let source = FakeSource::new(&[EVENT, EVENT]);
|
368 |
|
369 | let mut reader = InternalEventReader {
|
370 | events: VecDeque::new(),
|
371 | source: Some(Box::new(source)),
|
372 | skipped_events: Vec::with_capacity(32),
|
373 | };
|
374 |
|
375 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
376 | assert!(reader.read(&InternalEventFilter).is_err());
|
377 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
378 | }
|
379 |
|
380 | #[derive (Default)]
|
381 | struct FakeSource {
|
382 | events: VecDeque<InternalEvent>,
|
383 | error: Option<io::Error>,
|
384 | }
|
385 |
|
386 | impl FakeSource {
|
387 | fn new(events: &[InternalEvent]) -> FakeSource {
|
388 | FakeSource {
|
389 | events: events.to_vec().into(),
|
390 | error: Some(io::Error::new(io::ErrorKind::Other, "" )),
|
391 | }
|
392 | }
|
393 |
|
394 | fn with_events(events: &[InternalEvent]) -> FakeSource {
|
395 | FakeSource {
|
396 | events: events.to_vec().into(),
|
397 | error: None,
|
398 | }
|
399 | }
|
400 | }
|
401 |
|
402 | impl EventSource for FakeSource {
|
403 | fn try_read(&mut self, _timeout: Option<Duration>) -> io::Result<Option<InternalEvent>> {
|
404 | // Return error if set in case there's just one remaining event
|
405 | if self.events.len() == 1 {
|
406 | if let Some(error) = self.error.take() {
|
407 | return Err(error);
|
408 | }
|
409 | }
|
410 |
|
411 | // Return all events from the queue
|
412 | if let Some(event) = self.events.pop_front() {
|
413 | return Ok(Some(event));
|
414 | }
|
415 |
|
416 | // Return error if there're no more events
|
417 | if let Some(error) = self.error.take() {
|
418 | return Err(error);
|
419 | }
|
420 |
|
421 | // Timeout
|
422 | Ok(None)
|
423 | }
|
424 |
|
425 | #[cfg (feature = "event-stream" )]
|
426 | fn waker(&self) -> super::super::sys::Waker {
|
427 | unimplemented!();
|
428 | }
|
429 | }
|
430 | }
|
431 | |