| 1 | use std::{collections::vec_deque::VecDeque, io, time::Duration};
|
| 2 |
|
| 3 | #[cfg (unix)]
|
| 4 | use crate::event::source::unix::UnixInternalEventSource;
|
| 5 | #[cfg (windows)]
|
| 6 | use crate::event::source::windows::WindowsEventSource;
|
| 7 | #[cfg (feature = "event-stream" )]
|
| 8 | use crate::event::sys::Waker;
|
| 9 | use crate::event::{filter::Filter, source::EventSource, timeout::PollTimeout, InternalEvent};
|
| 10 |
|
| 11 | /// Can be used to read `InternalEvent`s.
|
| 12 | pub(crate) struct InternalEventReader {
|
| 13 | events: VecDeque<InternalEvent>,
|
| 14 | source: Option<Box<dyn EventSource>>,
|
| 15 | skipped_events: Vec<InternalEvent>,
|
| 16 | }
|
| 17 |
|
| 18 | impl Default for InternalEventReader {
|
| 19 | fn default() -> Self {
|
| 20 | #[cfg (windows)]
|
| 21 | let source = WindowsEventSource::new();
|
| 22 | #[cfg (unix)]
|
| 23 | let source: Result = UnixInternalEventSource::new();
|
| 24 |
|
| 25 | let source: Option> = source.ok().map(|x: UnixInternalEventSource| Box::new(x) as Box<dyn EventSource>);
|
| 26 |
|
| 27 | InternalEventReader {
|
| 28 | source,
|
| 29 | events: VecDeque::with_capacity(32),
|
| 30 | skipped_events: Vec::with_capacity(32),
|
| 31 | }
|
| 32 | }
|
| 33 | }
|
| 34 |
|
| 35 | impl InternalEventReader {
|
| 36 | /// Returns a `Waker` allowing to wake/force the `poll` method to return `Ok(false)`.
|
| 37 | #[cfg (feature = "event-stream" )]
|
| 38 | pub(crate) fn waker(&self) -> Waker {
|
| 39 | self.source.as_ref().expect("reader source not set" ).waker()
|
| 40 | }
|
| 41 |
|
| 42 | pub(crate) fn poll<F>(&mut self, timeout: Option<Duration>, filter: &F) -> io::Result<bool>
|
| 43 | where
|
| 44 | F: Filter,
|
| 45 | {
|
| 46 | for event in &self.events {
|
| 47 | if filter.eval(event) {
|
| 48 | return Ok(true);
|
| 49 | }
|
| 50 | }
|
| 51 |
|
| 52 | let event_source = match self.source.as_mut() {
|
| 53 | Some(source) => source,
|
| 54 | None => {
|
| 55 | return Err(std::io::Error::new(
|
| 56 | std::io::ErrorKind::Other,
|
| 57 | "Failed to initialize input reader" ,
|
| 58 | ))
|
| 59 | }
|
| 60 | };
|
| 61 |
|
| 62 | let poll_timeout = PollTimeout::new(timeout);
|
| 63 |
|
| 64 | loop {
|
| 65 | let maybe_event = match event_source.try_read(poll_timeout.leftover()) {
|
| 66 | Ok(None) => None,
|
| 67 | Ok(Some(event)) => {
|
| 68 | if filter.eval(&event) {
|
| 69 | Some(event)
|
| 70 | } else {
|
| 71 | self.skipped_events.push(event);
|
| 72 | None
|
| 73 | }
|
| 74 | }
|
| 75 | Err(e) => {
|
| 76 | if e.kind() == io::ErrorKind::Interrupted {
|
| 77 | return Ok(false);
|
| 78 | }
|
| 79 |
|
| 80 | return Err(e);
|
| 81 | }
|
| 82 | };
|
| 83 |
|
| 84 | if poll_timeout.elapsed() || maybe_event.is_some() {
|
| 85 | self.events.extend(self.skipped_events.drain(..));
|
| 86 |
|
| 87 | if let Some(event) = maybe_event {
|
| 88 | self.events.push_front(event);
|
| 89 | return Ok(true);
|
| 90 | }
|
| 91 |
|
| 92 | return Ok(false);
|
| 93 | }
|
| 94 | }
|
| 95 | }
|
| 96 |
|
| 97 | pub(crate) fn read<F>(&mut self, filter: &F) -> io::Result<InternalEvent>
|
| 98 | where
|
| 99 | F: Filter,
|
| 100 | {
|
| 101 | let mut skipped_events = VecDeque::new();
|
| 102 |
|
| 103 | loop {
|
| 104 | while let Some(event) = self.events.pop_front() {
|
| 105 | if filter.eval(&event) {
|
| 106 | while let Some(event) = skipped_events.pop_front() {
|
| 107 | self.events.push_back(event);
|
| 108 | }
|
| 109 |
|
| 110 | return Ok(event);
|
| 111 | } else {
|
| 112 | // We can not directly write events back to `self.events`.
|
| 113 | // If we did, we would put our self's into an endless loop
|
| 114 | // that would enqueue -> dequeue -> enqueue etc.
|
| 115 | // This happens because `poll` in this function will always return true if there are events in it's.
|
| 116 | // And because we just put the non-fulfilling event there this is going to be the case.
|
| 117 | // Instead we can store them into the temporary buffer,
|
| 118 | // and then when the filter is fulfilled write all events back in order.
|
| 119 | skipped_events.push_back(event);
|
| 120 | }
|
| 121 | }
|
| 122 |
|
| 123 | let _ = self.poll(None, filter)?;
|
| 124 | }
|
| 125 | }
|
| 126 | }
|
| 127 |
|
| 128 | #[cfg (test)]
|
| 129 | mod tests {
|
| 130 | use std::io;
|
| 131 | use std::{collections::VecDeque, time::Duration};
|
| 132 |
|
| 133 | #[cfg (unix)]
|
| 134 | use super::super::filter::CursorPositionFilter;
|
| 135 | use super::{super::Event, EventSource, Filter, InternalEvent, InternalEventReader};
|
| 136 |
|
| 137 | #[derive (Debug, Clone)]
|
| 138 | pub(crate) struct InternalEventFilter;
|
| 139 |
|
| 140 | impl Filter for InternalEventFilter {
|
| 141 | fn eval(&self, _: &InternalEvent) -> bool {
|
| 142 | true
|
| 143 | }
|
| 144 | }
|
| 145 |
|
| 146 | #[test ]
|
| 147 | fn test_poll_fails_without_event_source() {
|
| 148 | let mut reader = InternalEventReader {
|
| 149 | events: VecDeque::new(),
|
| 150 | source: None,
|
| 151 | skipped_events: Vec::with_capacity(32),
|
| 152 | };
|
| 153 |
|
| 154 | assert!(reader.poll(None, &InternalEventFilter).is_err());
|
| 155 | assert!(reader
|
| 156 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
|
| 157 | .is_err());
|
| 158 | assert!(reader
|
| 159 | .poll(Some(Duration::from_secs(10)), &InternalEventFilter)
|
| 160 | .is_err());
|
| 161 | }
|
| 162 |
|
| 163 | #[test ]
|
| 164 | fn test_poll_returns_true_for_matching_event_in_queue_at_front() {
|
| 165 | let mut reader = InternalEventReader {
|
| 166 | events: vec![InternalEvent::Event(Event::Resize(10, 10))].into(),
|
| 167 | source: None,
|
| 168 | skipped_events: Vec::with_capacity(32),
|
| 169 | };
|
| 170 |
|
| 171 | assert!(reader.poll(None, &InternalEventFilter).unwrap());
|
| 172 | }
|
| 173 |
|
| 174 | #[test ]
|
| 175 | #[cfg (unix)]
|
| 176 | fn test_poll_returns_true_for_matching_event_in_queue_at_back() {
|
| 177 | let mut reader = InternalEventReader {
|
| 178 | events: vec![
|
| 179 | InternalEvent::Event(Event::Resize(10, 10)),
|
| 180 | InternalEvent::CursorPosition(10, 20),
|
| 181 | ]
|
| 182 | .into(),
|
| 183 | source: None,
|
| 184 | skipped_events: Vec::with_capacity(32),
|
| 185 | };
|
| 186 |
|
| 187 | assert!(reader.poll(None, &CursorPositionFilter).unwrap());
|
| 188 | }
|
| 189 |
|
| 190 | #[test ]
|
| 191 | fn test_read_returns_matching_event_in_queue_at_front() {
|
| 192 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
| 193 |
|
| 194 | let mut reader = InternalEventReader {
|
| 195 | events: vec![EVENT].into(),
|
| 196 | source: None,
|
| 197 | skipped_events: Vec::with_capacity(32),
|
| 198 | };
|
| 199 |
|
| 200 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
| 201 | }
|
| 202 |
|
| 203 | #[test ]
|
| 204 | #[cfg (unix)]
|
| 205 | fn test_read_returns_matching_event_in_queue_at_back() {
|
| 206 | const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20);
|
| 207 |
|
| 208 | let mut reader = InternalEventReader {
|
| 209 | events: vec![InternalEvent::Event(Event::Resize(10, 10)), CURSOR_EVENT].into(),
|
| 210 | source: None,
|
| 211 | skipped_events: Vec::with_capacity(32),
|
| 212 | };
|
| 213 |
|
| 214 | assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT);
|
| 215 | }
|
| 216 |
|
| 217 | #[test ]
|
| 218 | #[cfg (unix)]
|
| 219 | fn test_read_does_not_consume_skipped_event() {
|
| 220 | const SKIPPED_EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
| 221 | const CURSOR_EVENT: InternalEvent = InternalEvent::CursorPosition(10, 20);
|
| 222 |
|
| 223 | let mut reader = InternalEventReader {
|
| 224 | events: vec![SKIPPED_EVENT, CURSOR_EVENT].into(),
|
| 225 | source: None,
|
| 226 | skipped_events: Vec::with_capacity(32),
|
| 227 | };
|
| 228 |
|
| 229 | assert_eq!(reader.read(&CursorPositionFilter).unwrap(), CURSOR_EVENT);
|
| 230 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), SKIPPED_EVENT);
|
| 231 | }
|
| 232 |
|
| 233 | #[test ]
|
| 234 | fn test_poll_timeouts_if_source_has_no_events() {
|
| 235 | let source = FakeSource::default();
|
| 236 |
|
| 237 | let mut reader = InternalEventReader {
|
| 238 | events: VecDeque::new(),
|
| 239 | source: Some(Box::new(source)),
|
| 240 | skipped_events: Vec::with_capacity(32),
|
| 241 | };
|
| 242 |
|
| 243 | assert!(!reader
|
| 244 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
|
| 245 | .unwrap());
|
| 246 | }
|
| 247 |
|
| 248 | #[test ]
|
| 249 | fn test_poll_returns_true_if_source_has_at_least_one_event() {
|
| 250 | let source = FakeSource::with_events(&[InternalEvent::Event(Event::Resize(10, 10))]);
|
| 251 |
|
| 252 | let mut reader = InternalEventReader {
|
| 253 | events: VecDeque::new(),
|
| 254 | source: Some(Box::new(source)),
|
| 255 | skipped_events: Vec::with_capacity(32),
|
| 256 | };
|
| 257 |
|
| 258 | assert!(reader.poll(None, &InternalEventFilter).unwrap());
|
| 259 | assert!(reader
|
| 260 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
|
| 261 | .unwrap());
|
| 262 | }
|
| 263 |
|
| 264 | #[test ]
|
| 265 | fn test_reads_returns_event_if_source_has_at_least_one_event() {
|
| 266 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
| 267 |
|
| 268 | let source = FakeSource::with_events(&[EVENT]);
|
| 269 |
|
| 270 | let mut reader = InternalEventReader {
|
| 271 | events: VecDeque::new(),
|
| 272 | source: Some(Box::new(source)),
|
| 273 | skipped_events: Vec::with_capacity(32),
|
| 274 | };
|
| 275 |
|
| 276 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
| 277 | }
|
| 278 |
|
| 279 | #[test ]
|
| 280 | fn test_read_returns_events_if_source_has_events() {
|
| 281 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
| 282 |
|
| 283 | let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]);
|
| 284 |
|
| 285 | let mut reader = InternalEventReader {
|
| 286 | events: VecDeque::new(),
|
| 287 | source: Some(Box::new(source)),
|
| 288 | skipped_events: Vec::with_capacity(32),
|
| 289 | };
|
| 290 |
|
| 291 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
| 292 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
| 293 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
| 294 | }
|
| 295 |
|
| 296 | #[test ]
|
| 297 | fn test_poll_returns_false_after_all_source_events_are_consumed() {
|
| 298 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
| 299 |
|
| 300 | let source = FakeSource::with_events(&[EVENT, EVENT, EVENT]);
|
| 301 |
|
| 302 | let mut reader = InternalEventReader {
|
| 303 | events: VecDeque::new(),
|
| 304 | source: Some(Box::new(source)),
|
| 305 | skipped_events: Vec::with_capacity(32),
|
| 306 | };
|
| 307 |
|
| 308 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
| 309 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
| 310 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
| 311 | assert!(!reader
|
| 312 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
|
| 313 | .unwrap());
|
| 314 | }
|
| 315 |
|
| 316 | #[test ]
|
| 317 | fn test_poll_propagates_error() {
|
| 318 | let mut reader = InternalEventReader {
|
| 319 | events: VecDeque::new(),
|
| 320 | source: Some(Box::new(FakeSource::new(&[]))),
|
| 321 | skipped_events: Vec::with_capacity(32),
|
| 322 | };
|
| 323 |
|
| 324 | assert_eq!(
|
| 325 | reader
|
| 326 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
|
| 327 | .err()
|
| 328 | .map(|e| format!("{:?}" , &e.kind())),
|
| 329 | Some(format!("{:?}" , io::ErrorKind::Other))
|
| 330 | );
|
| 331 | }
|
| 332 |
|
| 333 | #[test ]
|
| 334 | fn test_read_propagates_error() {
|
| 335 | let mut reader = InternalEventReader {
|
| 336 | events: VecDeque::new(),
|
| 337 | source: Some(Box::new(FakeSource::new(&[]))),
|
| 338 | skipped_events: Vec::with_capacity(32),
|
| 339 | };
|
| 340 |
|
| 341 | assert_eq!(
|
| 342 | reader
|
| 343 | .read(&InternalEventFilter)
|
| 344 | .err()
|
| 345 | .map(|e| format!("{:?}" , &e.kind())),
|
| 346 | Some(format!("{:?}" , io::ErrorKind::Other))
|
| 347 | );
|
| 348 | }
|
| 349 |
|
| 350 | #[test ]
|
| 351 | fn test_poll_continues_after_error() {
|
| 352 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
| 353 |
|
| 354 | let source = FakeSource::new(&[EVENT, EVENT]);
|
| 355 |
|
| 356 | let mut reader = InternalEventReader {
|
| 357 | events: VecDeque::new(),
|
| 358 | source: Some(Box::new(source)),
|
| 359 | skipped_events: Vec::with_capacity(32),
|
| 360 | };
|
| 361 |
|
| 362 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
| 363 | assert!(reader.read(&InternalEventFilter).is_err());
|
| 364 | assert!(reader
|
| 365 | .poll(Some(Duration::from_secs(0)), &InternalEventFilter)
|
| 366 | .unwrap());
|
| 367 | }
|
| 368 |
|
| 369 | #[test ]
|
| 370 | fn test_read_continues_after_error() {
|
| 371 | const EVENT: InternalEvent = InternalEvent::Event(Event::Resize(10, 10));
|
| 372 |
|
| 373 | let source = FakeSource::new(&[EVENT, EVENT]);
|
| 374 |
|
| 375 | let mut reader = InternalEventReader {
|
| 376 | events: VecDeque::new(),
|
| 377 | source: Some(Box::new(source)),
|
| 378 | skipped_events: Vec::with_capacity(32),
|
| 379 | };
|
| 380 |
|
| 381 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
| 382 | assert!(reader.read(&InternalEventFilter).is_err());
|
| 383 | assert_eq!(reader.read(&InternalEventFilter).unwrap(), EVENT);
|
| 384 | }
|
| 385 |
|
| 386 | #[derive (Default)]
|
| 387 | struct FakeSource {
|
| 388 | events: VecDeque<InternalEvent>,
|
| 389 | error: Option<io::Error>,
|
| 390 | }
|
| 391 |
|
| 392 | impl FakeSource {
|
| 393 | fn new(events: &[InternalEvent]) -> FakeSource {
|
| 394 | FakeSource {
|
| 395 | events: events.to_vec().into(),
|
| 396 | error: Some(io::Error::new(io::ErrorKind::Other, "" )),
|
| 397 | }
|
| 398 | }
|
| 399 |
|
| 400 | fn with_events(events: &[InternalEvent]) -> FakeSource {
|
| 401 | FakeSource {
|
| 402 | events: events.to_vec().into(),
|
| 403 | error: None,
|
| 404 | }
|
| 405 | }
|
| 406 | }
|
| 407 |
|
| 408 | impl EventSource for FakeSource {
|
| 409 | fn try_read(&mut self, _timeout: Option<Duration>) -> io::Result<Option<InternalEvent>> {
|
| 410 | // Return error if set in case there's just one remaining event
|
| 411 | if self.events.len() == 1 {
|
| 412 | if let Some(error) = self.error.take() {
|
| 413 | return Err(error);
|
| 414 | }
|
| 415 | }
|
| 416 |
|
| 417 | // Return all events from the queue
|
| 418 | if let Some(event) = self.events.pop_front() {
|
| 419 | return Ok(Some(event));
|
| 420 | }
|
| 421 |
|
| 422 | // Return error if there're no more events
|
| 423 | if let Some(error) = self.error.take() {
|
| 424 | return Err(error);
|
| 425 | }
|
| 426 |
|
| 427 | // Timeout
|
| 428 | Ok(None)
|
| 429 | }
|
| 430 |
|
| 431 | #[cfg (feature = "event-stream" )]
|
| 432 | fn waker(&self) -> super::super::sys::Waker {
|
| 433 | unimplemented!();
|
| 434 | }
|
| 435 | }
|
| 436 | }
|
| 437 | |