1 | use std::{collections::VecDeque, io, time::Duration};
|
2 |
|
3 | use mio::{unix::SourceFd, Events, Interest, Poll, Token};
|
4 | use signal_hook_mio::v0_8::Signals;
|
5 |
|
6 | #[cfg (feature = "event-stream" )]
|
7 | use crate::event::sys::Waker;
|
8 | use crate::event::{
|
9 | source::EventSource, sys::unix::parse::parse_event, timeout::PollTimeout, Event, InternalEvent,
|
10 | };
|
11 | use crate::terminal::sys::file_descriptor::{tty_fd, FileDesc};
|
12 |
|
13 | // Tokens to identify file descriptor
|
14 | const TTY_TOKEN: Token = Token(0);
|
15 | const SIGNAL_TOKEN: Token = Token(1);
|
16 | #[cfg (feature = "event-stream" )]
|
17 | const WAKE_TOKEN: Token = Token(2);
|
18 |
|
19 | // I (@zrzka) wasn't able to read more than 1_022 bytes when testing
|
20 | // reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes
|
21 | // is enough.
|
22 | const TTY_BUFFER_SIZE: usize = 1_024;
|
23 |
|
24 | pub(crate) struct UnixInternalEventSource {
|
25 | poll: Poll,
|
26 | events: Events,
|
27 | parser: Parser,
|
28 | tty_buffer: [u8; TTY_BUFFER_SIZE],
|
29 | tty_fd: FileDesc,
|
30 | signals: Signals,
|
31 | #[cfg (feature = "event-stream" )]
|
32 | waker: Waker,
|
33 | }
|
34 |
|
35 | impl UnixInternalEventSource {
|
36 | pub fn new() -> io::Result<Self> {
|
37 | UnixInternalEventSource::from_file_descriptor(tty_fd()?)
|
38 | }
|
39 |
|
40 | pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> io::Result<Self> {
|
41 | let poll = Poll::new()?;
|
42 | let registry = poll.registry();
|
43 |
|
44 | let tty_raw_fd = input_fd.raw_fd();
|
45 | let mut tty_ev = SourceFd(&tty_raw_fd);
|
46 | registry.register(&mut tty_ev, TTY_TOKEN, Interest::READABLE)?;
|
47 |
|
48 | let mut signals = Signals::new([signal_hook::consts::SIGWINCH])?;
|
49 | registry.register(&mut signals, SIGNAL_TOKEN, Interest::READABLE)?;
|
50 |
|
51 | #[cfg (feature = "event-stream" )]
|
52 | let waker = Waker::new(registry, WAKE_TOKEN)?;
|
53 |
|
54 | Ok(UnixInternalEventSource {
|
55 | poll,
|
56 | events: Events::with_capacity(3),
|
57 | parser: Parser::default(),
|
58 | tty_buffer: [0u8; TTY_BUFFER_SIZE],
|
59 | tty_fd: input_fd,
|
60 | signals,
|
61 | #[cfg (feature = "event-stream" )]
|
62 | waker,
|
63 | })
|
64 | }
|
65 | }
|
66 |
|
67 | impl EventSource for UnixInternalEventSource {
|
68 | fn try_read(&mut self, timeout: Option<Duration>) -> io::Result<Option<InternalEvent>> {
|
69 | if let Some(event) = self.parser.next() {
|
70 | return Ok(Some(event));
|
71 | }
|
72 |
|
73 | let timeout = PollTimeout::new(timeout);
|
74 |
|
75 | loop {
|
76 | if let Err(e) = self.poll.poll(&mut self.events, timeout.leftover()) {
|
77 | // Mio will throw an interrupted error in case of cursor position retrieval. We need to retry until it succeeds.
|
78 | // Previous versions of Mio (< 0.7) would automatically retry the poll call if it was interrupted (if EINTR was returned).
|
79 | // https://docs.rs/mio/0.7.0/mio/struct.Poll.html#notes
|
80 | if e.kind() == io::ErrorKind::Interrupted {
|
81 | continue;
|
82 | } else {
|
83 | return Err(e);
|
84 | }
|
85 | };
|
86 |
|
87 | if self.events.is_empty() {
|
88 | // No readiness events = timeout
|
89 | return Ok(None);
|
90 | }
|
91 |
|
92 | for token in self.events.iter().map(|x| x.token()) {
|
93 | match token {
|
94 | TTY_TOKEN => {
|
95 | loop {
|
96 | match self.tty_fd.read(&mut self.tty_buffer, TTY_BUFFER_SIZE) {
|
97 | Ok(read_count) => {
|
98 | if read_count > 0 {
|
99 | self.parser.advance(
|
100 | &self.tty_buffer[..read_count],
|
101 | read_count == TTY_BUFFER_SIZE,
|
102 | );
|
103 | }
|
104 | }
|
105 | Err(e) => {
|
106 | // No more data to read at the moment. We will receive another event
|
107 | if e.kind() == io::ErrorKind::WouldBlock {
|
108 | break;
|
109 | }
|
110 | // once more data is available to read.
|
111 | else if e.kind() == io::ErrorKind::Interrupted {
|
112 | continue;
|
113 | }
|
114 | }
|
115 | };
|
116 |
|
117 | if let Some(event) = self.parser.next() {
|
118 | return Ok(Some(event));
|
119 | }
|
120 | }
|
121 | }
|
122 | SIGNAL_TOKEN => {
|
123 | for signal in self.signals.pending() {
|
124 | match signal {
|
125 | signal_hook::consts::SIGWINCH => {
|
126 | // TODO Should we remove tput?
|
127 | //
|
128 | // This can take a really long time, because terminal::size can
|
129 | // launch new process (tput) and then it parses its output. It's
|
130 | // not a really long time from the absolute time point of view, but
|
131 | // it's a really long time from the mio, async-std/tokio executor, ...
|
132 | // point of view.
|
133 | let new_size = crate::terminal::size()?;
|
134 | return Ok(Some(InternalEvent::Event(Event::Resize(
|
135 | new_size.0, new_size.1,
|
136 | ))));
|
137 | }
|
138 | _ => unreachable!("Synchronize signal registration & handling" ),
|
139 | };
|
140 | }
|
141 | }
|
142 | #[cfg (feature = "event-stream" )]
|
143 | WAKE_TOKEN => {
|
144 | return Err(std::io::Error::new(
|
145 | std::io::ErrorKind::Interrupted,
|
146 | "Poll operation was woken up by `Waker::wake`" ,
|
147 | ));
|
148 | }
|
149 | _ => unreachable!("Synchronize Evented handle registration & token handling" ),
|
150 | }
|
151 | }
|
152 |
|
153 | // Processing above can take some time, check if timeout expired
|
154 | if timeout.elapsed() {
|
155 | return Ok(None);
|
156 | }
|
157 | }
|
158 | }
|
159 |
|
160 | #[cfg (feature = "event-stream" )]
|
161 | fn waker(&self) -> Waker {
|
162 | self.waker.clone()
|
163 | }
|
164 | }
|
165 |
|
166 | //
|
167 | // Following `Parser` structure exists for two reasons:
|
168 | //
|
169 | // * mimic anes Parser interface
|
170 | // * move the advancing, parsing, ... stuff out of the `try_read` method
|
171 | //
|
172 | #[derive (Debug)]
|
173 | struct Parser {
|
174 | buffer: Vec<u8>,
|
175 | internal_events: VecDeque<InternalEvent>,
|
176 | }
|
177 |
|
178 | impl Default for Parser {
|
179 | fn default() -> Self {
|
180 | Parser {
|
181 | // This buffer is used for -> 1 <- ANSI escape sequence. Are we
|
182 | // aware of any ANSI escape sequence that is bigger? Can we make
|
183 | // it smaller?
|
184 | //
|
185 | // Probably not worth spending more time on this as "there's a plan"
|
186 | // to use the anes crate parser.
|
187 | buffer: Vec::with_capacity(256),
|
188 | // TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can
|
189 | // fit? What is an average sequence length? Let's guess here
|
190 | // and say that the average ANSI escape sequence length is 8 bytes. Thus
|
191 | // the buffer size should be 1024/8=128 to avoid additional allocations
|
192 | // when processing large amounts of data.
|
193 | //
|
194 | // There's no need to make it bigger, because when you look at the `try_read`
|
195 | // method implementation, all events are consumed before the next TTY_BUFFER
|
196 | // is processed -> events pushed.
|
197 | internal_events: VecDeque::with_capacity(128),
|
198 | }
|
199 | }
|
200 | }
|
201 |
|
202 | impl Parser {
|
203 | fn advance(&mut self, buffer: &[u8], more: bool) {
|
204 | for (idx, byte) in buffer.iter().enumerate() {
|
205 | let more = idx + 1 < buffer.len() || more;
|
206 |
|
207 | self.buffer.push(*byte);
|
208 |
|
209 | match parse_event(&self.buffer, more) {
|
210 | Ok(Some(ie)) => {
|
211 | self.internal_events.push_back(ie);
|
212 | self.buffer.clear();
|
213 | }
|
214 | Ok(None) => {
|
215 | // Event can't be parsed, because we don't have enough bytes for
|
216 | // the current sequence. Keep the buffer and process next bytes.
|
217 | }
|
218 | Err(_) => {
|
219 | // Event can't be parsed (not enough parameters, parameter is not a number, ...).
|
220 | // Clear the buffer and continue with another sequence.
|
221 | self.buffer.clear();
|
222 | }
|
223 | }
|
224 | }
|
225 | }
|
226 | }
|
227 |
|
228 | impl Iterator for Parser {
|
229 | type Item = InternalEvent;
|
230 |
|
231 | fn next(&mut self) -> Option<Self::Item> {
|
232 | self.internal_events.pop_front()
|
233 | }
|
234 | }
|
235 | |