1use std::{collections::VecDeque, io, time::Duration};
2
3use mio::{unix::SourceFd, Events, Interest, Poll, Token};
4use signal_hook_mio::v0_8::Signals;
5
6#[cfg(feature = "event-stream")]
7use crate::event::sys::Waker;
8use crate::event::{
9 source::EventSource, sys::unix::parse::parse_event, timeout::PollTimeout, Event, InternalEvent,
10};
11use crate::terminal::sys::file_descriptor::{tty_fd, FileDesc};
12
13// Tokens to identify file descriptor
14const TTY_TOKEN: Token = Token(0);
15const SIGNAL_TOKEN: Token = Token(1);
16#[cfg(feature = "event-stream")]
17const WAKE_TOKEN: Token = Token(2);
18
19// I (@zrzka) wasn't able to read more than 1_022 bytes when testing
20// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes
21// is enough.
22const TTY_BUFFER_SIZE: usize = 1_024;
23
24pub(crate) struct UnixInternalEventSource {
25 poll: Poll,
26 events: Events,
27 parser: Parser,
28 tty_buffer: [u8; TTY_BUFFER_SIZE],
29 tty_fd: FileDesc,
30 signals: Signals,
31 #[cfg(feature = "event-stream")]
32 waker: Waker,
33}
34
35impl UnixInternalEventSource {
36 pub fn new() -> io::Result<Self> {
37 UnixInternalEventSource::from_file_descriptor(tty_fd()?)
38 }
39
40 pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> io::Result<Self> {
41 let poll = Poll::new()?;
42 let registry = poll.registry();
43
44 let tty_raw_fd = input_fd.raw_fd();
45 let mut tty_ev = SourceFd(&tty_raw_fd);
46 registry.register(&mut tty_ev, TTY_TOKEN, Interest::READABLE)?;
47
48 let mut signals = Signals::new([signal_hook::consts::SIGWINCH])?;
49 registry.register(&mut signals, SIGNAL_TOKEN, Interest::READABLE)?;
50
51 #[cfg(feature = "event-stream")]
52 let waker = Waker::new(registry, WAKE_TOKEN)?;
53
54 Ok(UnixInternalEventSource {
55 poll,
56 events: Events::with_capacity(3),
57 parser: Parser::default(),
58 tty_buffer: [0u8; TTY_BUFFER_SIZE],
59 tty_fd: input_fd,
60 signals,
61 #[cfg(feature = "event-stream")]
62 waker,
63 })
64 }
65}
66
67impl EventSource for UnixInternalEventSource {
68 fn try_read(&mut self, timeout: Option<Duration>) -> io::Result<Option<InternalEvent>> {
69 if let Some(event) = self.parser.next() {
70 return Ok(Some(event));
71 }
72
73 let timeout = PollTimeout::new(timeout);
74
75 loop {
76 if let Err(e) = self.poll.poll(&mut self.events, timeout.leftover()) {
77 // Mio will throw an interrupted error in case of cursor position retrieval. We need to retry until it succeeds.
78 // Previous versions of Mio (< 0.7) would automatically retry the poll call if it was interrupted (if EINTR was returned).
79 // https://docs.rs/mio/0.7.0/mio/struct.Poll.html#notes
80 if e.kind() == io::ErrorKind::Interrupted {
81 continue;
82 } else {
83 return Err(e);
84 }
85 };
86
87 if self.events.is_empty() {
88 // No readiness events = timeout
89 return Ok(None);
90 }
91
92 for token in self.events.iter().map(|x| x.token()) {
93 match token {
94 TTY_TOKEN => {
95 loop {
96 match self.tty_fd.read(&mut self.tty_buffer, TTY_BUFFER_SIZE) {
97 Ok(read_count) => {
98 if read_count > 0 {
99 self.parser.advance(
100 &self.tty_buffer[..read_count],
101 read_count == TTY_BUFFER_SIZE,
102 );
103 }
104 }
105 Err(e) => {
106 // No more data to read at the moment. We will receive another event
107 if e.kind() == io::ErrorKind::WouldBlock {
108 break;
109 }
110 // once more data is available to read.
111 else if e.kind() == io::ErrorKind::Interrupted {
112 continue;
113 }
114 }
115 };
116
117 if let Some(event) = self.parser.next() {
118 return Ok(Some(event));
119 }
120 }
121 }
122 SIGNAL_TOKEN => {
123 for signal in self.signals.pending() {
124 match signal {
125 signal_hook::consts::SIGWINCH => {
126 // TODO Should we remove tput?
127 //
128 // This can take a really long time, because terminal::size can
129 // launch new process (tput) and then it parses its output. It's
130 // not a really long time from the absolute time point of view, but
131 // it's a really long time from the mio, async-std/tokio executor, ...
132 // point of view.
133 let new_size = crate::terminal::size()?;
134 return Ok(Some(InternalEvent::Event(Event::Resize(
135 new_size.0, new_size.1,
136 ))));
137 }
138 _ => unreachable!("Synchronize signal registration & handling"),
139 };
140 }
141 }
142 #[cfg(feature = "event-stream")]
143 WAKE_TOKEN => {
144 return Err(std::io::Error::new(
145 std::io::ErrorKind::Interrupted,
146 "Poll operation was woken up by `Waker::wake`",
147 ));
148 }
149 _ => unreachable!("Synchronize Evented handle registration & token handling"),
150 }
151 }
152
153 // Processing above can take some time, check if timeout expired
154 if timeout.elapsed() {
155 return Ok(None);
156 }
157 }
158 }
159
160 #[cfg(feature = "event-stream")]
161 fn waker(&self) -> Waker {
162 self.waker.clone()
163 }
164}
165
166//
167// Following `Parser` structure exists for two reasons:
168//
169// * mimic anes Parser interface
170// * move the advancing, parsing, ... stuff out of the `try_read` method
171//
172#[derive(Debug)]
173struct Parser {
174 buffer: Vec<u8>,
175 internal_events: VecDeque<InternalEvent>,
176}
177
178impl Default for Parser {
179 fn default() -> Self {
180 Parser {
181 // This buffer is used for -> 1 <- ANSI escape sequence. Are we
182 // aware of any ANSI escape sequence that is bigger? Can we make
183 // it smaller?
184 //
185 // Probably not worth spending more time on this as "there's a plan"
186 // to use the anes crate parser.
187 buffer: Vec::with_capacity(256),
188 // TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can
189 // fit? What is an average sequence length? Let's guess here
190 // and say that the average ANSI escape sequence length is 8 bytes. Thus
191 // the buffer size should be 1024/8=128 to avoid additional allocations
192 // when processing large amounts of data.
193 //
194 // There's no need to make it bigger, because when you look at the `try_read`
195 // method implementation, all events are consumed before the next TTY_BUFFER
196 // is processed -> events pushed.
197 internal_events: VecDeque::with_capacity(128),
198 }
199 }
200}
201
202impl Parser {
203 fn advance(&mut self, buffer: &[u8], more: bool) {
204 for (idx, byte) in buffer.iter().enumerate() {
205 let more = idx + 1 < buffer.len() || more;
206
207 self.buffer.push(*byte);
208
209 match parse_event(&self.buffer, more) {
210 Ok(Some(ie)) => {
211 self.internal_events.push_back(ie);
212 self.buffer.clear();
213 }
214 Ok(None) => {
215 // Event can't be parsed, because we don't have enough bytes for
216 // the current sequence. Keep the buffer and process next bytes.
217 }
218 Err(_) => {
219 // Event can't be parsed (not enough parameters, parameter is not a number, ...).
220 // Clear the buffer and continue with another sequence.
221 self.buffer.clear();
222 }
223 }
224 }
225 }
226}
227
228impl Iterator for Parser {
229 type Item = InternalEvent;
230
231 fn next(&mut self) -> Option<Self::Item> {
232 self.internal_events.pop_front()
233 }
234}
235