1 | use std::{collections::vec_deque::VecDeque, io, time::Duration};
2 |
3 | #[cfg (unix)]
4 | use crate::event::source::unix::UnixInternalEventSource;
5 | #[cfg (windows)]
6 | use crate::event::source::windows::WindowsEventSource;
7 | #[cfg (feature = "event-stream" )]
8 | use crate::event::sys::Waker;
9 | use crate::event::{filter::Filter, source::EventSource, timeout::PollTimeout, InternalEvent};
10 |
11 | /// Can be used to read `InternalEvent`s.
12 | pub(crate) struct InternalEventReader {
13 | events: VecDeque<InternalEvent>,
14 | source: Option<Box<dyn EventSource>>,
15 | skipped_events: Vec<InternalEvent>,
16 | }
17 |
18 | impl Default for InternalEventReader {
19 | fn default() -> Self {
20 | #[cfg (windows)]
21 | let source = WindowsEventSource::new();
22 | #[cfg (unix)]
23 | let source: Result = UnixInternalEventSource::new();
24 |
25 | let source: Option> = source.ok().map(|x: UnixInternalEventSource| Box::new(x) as Box<dyn EventSource>);
26 |
27 | InternalEventReader {
28 | source,
29 | events: VecDeque::with_capacity(32),
30 | skipped_events: Vec::with_capacity(32),
31 | }
32 | }
33 | }
34 |
35 | impl InternalEventReader {
36 | /// Returns a `Waker` allowing to wake/force the `poll` method to return `Ok(false)`.
37 | #[cfg (feature = "event-stream" )]
38 | pub(crate) fn waker(&self) -> Waker {
39 | self.source.as_ref().expect("reader source not set" ).waker()
40 | }
41 |
42 | pub(crate) fn poll<F>(&mut self, timeout: Option<Duration>, filter: &F) -> io::Result<bool>
43 | where
44 | F: Filter,
45 | {
46 | for event in &self.events {
47 | if filter.eval(event) {
48 | return Ok(true);
49 | }
50 | }
51 |
52 | let event_source = match self.source.as_mut() {
53 | Some(source) => source,
54 | None => {
55 | return Err(std::io::Error::new(
56 | std::io::ErrorKind::Other,
57 | "Failed to initialize input reader" ,
58 | ))
59 | }
60 | };
61 |
62 | let poll_timeout = PollTimeout::new(timeout);
63 |
64 | loop {
65 | let maybe_event = match event_source.try_read(poll_timeout.leftover()) {
66 | Ok(None) => None,
67 | Ok(Some(event)) => {
68 | if filter.eval(&event) {
69 | Some(event)
70 | } else {
71 | self.skipped_events.push(event);
72 | None
73 | }
74 | }
75 | Err(e) => {
76 | if e.kind() == io::ErrorKind::Interrupted {
77 | return Ok(false);
78 | }
79 |
80 | return Err(e);
81 | }
82 | };
83 |
84 | if poll_timeout.elapsed() || maybe_event.is_some() {
85 | self.events.extend(self.skipped_events.drain(..));
86 |
87 | if let Some(event) = maybe_event {
88 | self.events.push_front(event);
89 | return Ok(true);
90 | }
91 |
92 | return Ok(false);
93 | }
94 | }
95 | }
96 |
97 | pub(crate) fn read<F>(&mut self, filter: &F) -> io::Result<InternalEvent>
98 | where
99 | F: Filter,
100 | {
101 | let mut skipped_events = VecDeque::new();
102 |
103 | loop {
104 | while let Some(event) = self.events.pop_front() {
105 | if filter.eval(&event) {
106 | while let Some(event) = skipped_events.pop_front() {
107 | self.events.push_back(event);
108 | }
109 |
110 | return Ok(event);
111 | } else {
112 | // We can not directly write events back to `self.events`.
113 | // If we did, we would put our self's into an endless loop
114 | // that would enqueue -> dequeue -> enqueue etc.
115 | // This happens because `poll` in this function will always return true if there are events in it's.
116 | // And because we just put the non-fulfilling event there this is going to be the case.
117 | // Instead we can store them into the temporary buffer,
118 | // and then when the filter is fulfilled write all events back in order.
119 | skipped_events.push_back(event);
120 | }
121 | }
122 |
123 | let _ = self.poll(None, filter)?;
124 | }
125 | }
126 | }
127 |
128 | #[cfg (test)]
129 | mod tests {
130 | use std::io;
131 | use std::{collections::VecDeque, time::Duration};
132 |
133 | #[cfg (unix)]
134 | use super::super::filter::CursorPositionFilter;
135 | use super::{
136 | super::{filter::InternalEventFilter, Event},
137 | EventSource, InternalEvent, InternalEventReader,
138 | };
139 |
140 | #[test ]
141 | fn test_poll_fails_without_event_source() {
142 | let mut reader = InternalEventReader {
143 | events: VecDeque::new(),
144 | source: None,
145 | skipped_events: Vec::with_capacity(32),
146 | };
147 |
148 | assert!(reader.poll(None, &InternalEventFilter).is_err());
149 | assert!(reader
150 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
151 | .is_err());
152 | assert!(reader
153 | .poll(Some(Duration::from_secs(10)), &InternalEventFilter)
154 | .is_err());
155 | }
156 |
157 | #[test ]
158 | fn test_poll_returns_true_for_matching_event_in_queue_at_front() {
159 | let mut reader = InternalEventReader {
160 | events: vec![InternalEvent::Event(Event::Resize(10, 10))].into(),
161 | source: None,
162 | skipped_events: Vec::with_capacity(32),
163 | };
164 |
165 | assert!(reader.poll(None, &InternalEventFilter).unwrap());
166 | }
167 |
168 | #[test ]
169 | #[cfg (unix)]
170 | fn test_poll_returns_true_for_matching_event_in_queue_at_back() {
171 | let mut reader = InternalEventReader {
172 | events: vec![
173 | InternalEvent::Event(Event::Resize(10, 10)),
174 | InternalEvent::CursorPosition(10, 20),
175 | ]
176 | .into(),
177 | source: None,
178 | skipped_events: Vec::with_capacity(32),
179 | };
180 |
181 | assert!(reader.poll(None, &CursorPositionFilter).unwrap());
182 | }
183 |
184 | #[test ]
185 | fn test_read_returns_matching_event_in_queue_at_front() {
186 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
187 |
188 | let mut reader = InternalEventReader {
189 | events: vec![EVENT].into(),
190 | source: None,
191 | skipped_events: Vec::with_capacity(32),
192 | };
193 |
194 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
195 | }
196 |
197 | #[test ]
198 | #[cfg (unix)]
199 | fn test_read_returns_matching_event_in_queue_at_back() {
200 | const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20);
201 |
202 | let mut reader = InternalEventReader {
203 | events: vec![InternalEvent::Event(Event::Resize(10, 10)), CURSOR_EVENT].into(),
204 | source: None,
205 | skipped_events: Vec::with_capacity(32),
206 | };
207 |
208 | assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT);
209 | }
210 |
211 | #[test ]
212 | #[cfg (unix)]
213 | fn test_read_does_not_consume_skipped_event() {
214 | const SKIPPED_EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
215 | const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20);
216 |
217 | let mut reader = InternalEventReader {
218 | events: vec![SKIPPED_EVENT, CURSOR_EVENT].into(),
219 | source: None,
220 | skipped_events: Vec::with_capacity(32),
221 | };
222 |
223 | assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT);
224 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), SKIPPED_EVENT);
225 | }
226 |
227 | #[test ]
228 | fn test_poll_timeouts_if_source_has_no_events() {
229 | let source = FakeSource::default();
230 |
231 | let mut reader = InternalEventReader {
232 | events: VecDeque::new(),
233 | source: Some(Box::new(source)),
234 | skipped_events: Vec::with_capacity(32),
235 | };
236 |
237 | assert!(!reader
238 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
239 | .unwrap());
240 | }
241 |
242 | #[test ]
243 | fn test_poll_returns_true_if_source_has_at_least_one_event() {
244 | let source = FakeSource::with_events(&[InternalEvent::Event(Event::Resize(10, 10))]);
245 |
246 | let mut reader = InternalEventReader {
247 | events: VecDeque::new(),
248 | source: Some(Box::new(source)),
249 | skipped_events: Vec::with_capacity(32),
250 | };
251 |
252 | assert!(reader.poll(None, &InternalEventFilter).unwrap());
253 | assert!(reader
254 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
255 | .unwrap());
256 | }
257 |
258 | #[test ]
259 | fn test_reads_returns_event_if_source_has_at_least_one_event() {
260 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
261 |
262 | let source = FakeSource::with_events(&[EVENT]);
263 |
264 | let mut reader = InternalEventReader {
265 | events: VecDeque::new(),
266 | source: Some(Box::new(source)),
267 | skipped_events: Vec::with_capacity(32),
268 | };
269 |
270 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
271 | }
272 |
273 | #[test ]
274 | fn test_read_returns_events_if_source_has_events() {
275 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
276 |
277 | let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]);
278 |
279 | let mut reader = InternalEventReader {
280 | events: VecDeque::new(),
281 | source: Some(Box::new(source)),
282 | skipped_events: Vec::with_capacity(32),
283 | };
284 |
285 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
286 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
287 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
288 | }
289 |
290 | #[test ]
291 | fn test_poll_returns_false_after_all_source_events_are_consumed() {
292 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
293 |
294 | let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]);
295 |
296 | let mut reader = InternalEventReader {
297 | events: VecDeque::new(),
298 | source: Some(Box::new(source)),
299 | skipped_events: Vec::with_capacity(32),
300 | };
301 |
302 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
303 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
304 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
305 | assert!(!reader
306 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
307 | .unwrap());
308 | }
309 |
310 | #[test ]
311 | fn test_poll_propagates_error() {
312 | let mut reader = InternalEventReader {
313 | events: VecDeque::new(),
314 | source: Some(Box::new(FakeSource::new(&[]))),
315 | skipped_events: Vec::with_capacity(32),
316 | };
317 |
318 | assert_eq!(
319 | reader
320 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
321 | .err()
322 | .map(|e| format!(" {:?}" , &e.kind())),
323 | Some(format!(" {:?}" , io::ErrorKind::Other))
324 | );
325 | }
326 |
327 | #[test ]
328 | fn test_read_propagates_error() {
329 | let mut reader = InternalEventReader {
330 | events: VecDeque::new(),
331 | source: Some(Box::new(FakeSource::new(&[]))),
332 | skipped_events: Vec::with_capacity(32),
333 | };
334 |
335 | assert_eq!(
336 | reader
337 | .read(&InternalEventFilter)
338 | .err()
339 | .map(|e| format!(" {:?}" , &e.kind())),
340 | Some(format!(" {:?}" , io::ErrorKind::Other))
341 | );
342 | }
343 |
344 | #[test ]
345 | fn test_poll_continues_after_error() {
346 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
347 |
348 | let source = FakeSource::new(&[EVENT, EVENT]);
349 |
350 | let mut reader = InternalEventReader {
351 | events: VecDeque::new(),
352 | source: Some(Box::new(source)),
353 | skipped_events: Vec::with_capacity(32),
354 | };
355 |
356 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
357 | assert!(reader.read(&InternalEventFilter).is_err());
358 | assert!(reader
359 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
360 | .unwrap());
361 | }
362 |
363 | #[test ]
364 | fn test_read_continues_after_error() {
365 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
366 |
367 | let source = FakeSource::new(&[EVENT, EVENT]);
368 |
369 | let mut reader = InternalEventReader {
370 | events: VecDeque::new(),
371 | source: Some(Box::new(source)),
372 | skipped_events: Vec::with_capacity(32),
373 | };
374 |
375 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
376 | assert!(reader.read(&InternalEventFilter).is_err());
377 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
378 | }
379 |
380 | #[derive (Default)]
381 | struct FakeSource {
382 | events: VecDeque<InternalEvent>,
383 | error: Option<io::Error>,
384 | }
385 |
386 | impl FakeSource {
387 | fn new(events: &[InternalEvent]) -> FakeSource {
388 | FakeSource {
389 | events: events.to_vec().into(),
390 | error: Some(io::Error::new(io::ErrorKind::Other, "" )),
391 | }
392 | }
393 |
394 | fn with_events(events: &[InternalEvent]) -> FakeSource {
395 | FakeSource {
396 | events: events.to_vec().into(),
397 | error: None,
398 | }
399 | }
400 | }
401 |
402 | impl EventSource for FakeSource {
403 | fn try_read(&mut self, _timeout: Option<Duration>) -> io::Result<Option<InternalEvent>> {
404 | // Return error if set in case there's just one remaining event
405 | if self.events.len() == 1 {
406 | if let Some(error) = self.error.take() {
407 | return Err(error);
408 | }
409 | }
410 |
411 | // Return all events from the queue
412 | if let Some(event) = self.events.pop_front() {
413 | return Ok(Some(event));
414 | }
415 |
416 | // Return error if there're no more events
417 | if let Some(error) = self.error.take() {
418 | return Err(error);
419 | }
420 |
421 | // Timeout
422 | Ok(None)
423 | }
424 |
425 | #[cfg (feature = "event-stream" )]
426 | fn waker(&self) -> super::super::sys::Waker {
427 | unimplemented!();
428 | }
429 | }
430 | }
431 | |