| 1 | use std::{collections::VecDeque, io, time::Duration};
|
| 2 |
|
| 3 | use mio::{unix::SourceFd, Events, Interest, Poll, Token};
|
| 4 | use signal_hook_mio::v1_0::Signals;
|
| 5 |
|
| 6 | #[cfg (feature = "event-stream" )]
|
| 7 | use crate::event::sys::Waker;
|
| 8 | use crate::event::{
|
| 9 | source::EventSource, sys::unix::parse::parse_event, timeout::PollTimeout, Event, InternalEvent,
|
| 10 | };
|
| 11 | use crate::terminal::sys::file_descriptor::{tty_fd, FileDesc};
|
| 12 |
|
| 13 | // Tokens to identify file descriptor
|
| 14 | const TTY_TOKEN: Token = Token(0);
|
| 15 | const SIGNAL_TOKEN: Token = Token(1);
|
| 16 | #[cfg (feature = "event-stream" )]
|
| 17 | const WAKE_TOKEN: Token = Token(2);
|
| 18 |
|
| 19 | // I (@zrzka) wasn't able to read more than 1_022 bytes when testing
|
| 20 | // reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes
|
| 21 | // is enough.
|
| 22 | const TTY_BUFFER_SIZE: usize = 1_024;
|
| 23 |
|
| 24 | pub(crate) struct UnixInternalEventSource {
|
| 25 | poll: Poll,
|
| 26 | events: Events,
|
| 27 | parser: Parser,
|
| 28 | tty_buffer: [u8; TTY_BUFFER_SIZE],
|
| 29 | tty_fd: FileDesc<'static>,
|
| 30 | signals: Signals,
|
| 31 | #[cfg (feature = "event-stream" )]
|
| 32 | waker: Waker,
|
| 33 | }
|
| 34 |
|
| 35 | impl UnixInternalEventSource {
|
| 36 | pub fn new() -> io::Result<Self> {
|
| 37 | UnixInternalEventSource::from_file_descriptor(tty_fd()?)
|
| 38 | }
|
| 39 |
|
| 40 | pub(crate) fn from_file_descriptor(input_fd: FileDesc<'static>) -> io::Result<Self> {
|
| 41 | let poll = Poll::new()?;
|
| 42 | let registry = poll.registry();
|
| 43 |
|
| 44 | let tty_raw_fd = input_fd.raw_fd();
|
| 45 | let mut tty_ev = SourceFd(&tty_raw_fd);
|
| 46 | registry.register(&mut tty_ev, TTY_TOKEN, Interest::READABLE)?;
|
| 47 |
|
| 48 | let mut signals = Signals::new([signal_hook::consts::SIGWINCH])?;
|
| 49 | registry.register(&mut signals, SIGNAL_TOKEN, Interest::READABLE)?;
|
| 50 |
|
| 51 | #[cfg (feature = "event-stream" )]
|
| 52 | let waker = Waker::new(registry, WAKE_TOKEN)?;
|
| 53 |
|
| 54 | Ok(UnixInternalEventSource {
|
| 55 | poll,
|
| 56 | events: Events::with_capacity(3),
|
| 57 | parser: Parser::default(),
|
| 58 | tty_buffer: [0u8; TTY_BUFFER_SIZE],
|
| 59 | tty_fd: input_fd,
|
| 60 | signals,
|
| 61 | #[cfg (feature = "event-stream" )]
|
| 62 | waker,
|
| 63 | })
|
| 64 | }
|
| 65 | }
|
| 66 |
|
| 67 | impl EventSource for UnixInternalEventSource {
|
| 68 | fn try_read(&mut self, timeout: Option<Duration>) -> io::Result<Option<InternalEvent>> {
|
| 69 | if let Some(event) = self.parser.next() {
|
| 70 | return Ok(Some(event));
|
| 71 | }
|
| 72 |
|
| 73 | let timeout = PollTimeout::new(timeout);
|
| 74 |
|
| 75 | loop {
|
| 76 | if let Err(e) = self.poll.poll(&mut self.events, timeout.leftover()) {
|
| 77 | // Mio will throw an interrupted error in case of cursor position retrieval. We need to retry until it succeeds.
|
| 78 | // Previous versions of Mio (< 0.7) would automatically retry the poll call if it was interrupted (if EINTR was returned).
|
| 79 | // https://docs.rs/mio/0.7.0/mio/struct.Poll.html#notes
|
| 80 | if e.kind() == io::ErrorKind::Interrupted {
|
| 81 | continue;
|
| 82 | } else {
|
| 83 | return Err(e);
|
| 84 | }
|
| 85 | };
|
| 86 |
|
| 87 | if self.events.is_empty() {
|
| 88 | // No readiness events = timeout
|
| 89 | return Ok(None);
|
| 90 | }
|
| 91 |
|
| 92 | for token in self.events.iter().map(|x| x.token()) {
|
| 93 | match token {
|
| 94 | TTY_TOKEN => {
|
| 95 | loop {
|
| 96 | match self.tty_fd.read(&mut self.tty_buffer) {
|
| 97 | Ok(read_count) => {
|
| 98 | if read_count > 0 {
|
| 99 | self.parser.advance(
|
| 100 | &self.tty_buffer[..read_count],
|
| 101 | read_count == TTY_BUFFER_SIZE,
|
| 102 | );
|
| 103 | }
|
| 104 | }
|
| 105 | Err(e) => {
|
| 106 | // No more data to read at the moment. We will receive another event
|
| 107 | if e.kind() == io::ErrorKind::WouldBlock {
|
| 108 | break;
|
| 109 | }
|
| 110 | // once more data is available to read.
|
| 111 | else if e.kind() == io::ErrorKind::Interrupted {
|
| 112 | continue;
|
| 113 | }
|
| 114 | }
|
| 115 | };
|
| 116 |
|
| 117 | if let Some(event) = self.parser.next() {
|
| 118 | return Ok(Some(event));
|
| 119 | }
|
| 120 | }
|
| 121 | }
|
| 122 | SIGNAL_TOKEN => {
|
| 123 | if self.signals.pending().next() == Some(signal_hook::consts::SIGWINCH) {
|
| 124 | // TODO Should we remove tput?
|
| 125 | //
|
| 126 | // This can take a really long time, because terminal::size can
|
| 127 | // launch new process (tput) and then it parses its output. It's
|
| 128 | // not a really long time from the absolute time point of view, but
|
| 129 | // it's a really long time from the mio, async-std/tokio executor, ...
|
| 130 | // point of view.
|
| 131 | let new_size = crate::terminal::size()?;
|
| 132 | return Ok(Some(InternalEvent::Event(Event::Resize(
|
| 133 | new_size.0, new_size.1,
|
| 134 | ))));
|
| 135 | }
|
| 136 | }
|
| 137 | #[cfg (feature = "event-stream" )]
|
| 138 | WAKE_TOKEN => {
|
| 139 | return Err(std::io::Error::new(
|
| 140 | std::io::ErrorKind::Interrupted,
|
| 141 | "Poll operation was woken up by `Waker::wake`" ,
|
| 142 | ));
|
| 143 | }
|
| 144 | _ => unreachable!("Synchronize Evented handle registration & token handling" ),
|
| 145 | }
|
| 146 | }
|
| 147 |
|
| 148 | // Processing above can take some time, check if timeout expired
|
| 149 | if timeout.elapsed() {
|
| 150 | return Ok(None);
|
| 151 | }
|
| 152 | }
|
| 153 | }
|
| 154 |
|
| 155 | #[cfg (feature = "event-stream" )]
|
| 156 | fn waker(&self) -> Waker {
|
| 157 | self.waker.clone()
|
| 158 | }
|
| 159 | }
|
| 160 |
|
| 161 | //
|
| 162 | // Following `Parser` structure exists for two reasons:
|
| 163 | //
|
| 164 | // * mimic anes Parser interface
|
| 165 | // * move the advancing, parsing, ... stuff out of the `try_read` method
|
| 166 | //
|
| 167 | #[derive (Debug)]
|
| 168 | struct Parser {
|
| 169 | buffer: Vec<u8>,
|
| 170 | internal_events: VecDeque<InternalEvent>,
|
| 171 | }
|
| 172 |
|
| 173 | impl Default for Parser {
|
| 174 | fn default() -> Self {
|
| 175 | Parser {
|
| 176 | // This buffer is used for -> 1 <- ANSI escape sequence. Are we
|
| 177 | // aware of any ANSI escape sequence that is bigger? Can we make
|
| 178 | // it smaller?
|
| 179 | //
|
| 180 | // Probably not worth spending more time on this as "there's a plan"
|
| 181 | // to use the anes crate parser.
|
| 182 | buffer: Vec::with_capacity(256),
|
| 183 | // TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can
|
| 184 | // fit? What is an average sequence length? Let's guess here
|
| 185 | // and say that the average ANSI escape sequence length is 8 bytes. Thus
|
| 186 | // the buffer size should be 1024/8=128 to avoid additional allocations
|
| 187 | // when processing large amounts of data.
|
| 188 | //
|
| 189 | // There's no need to make it bigger, because when you look at the `try_read`
|
| 190 | // method implementation, all events are consumed before the next TTY_BUFFER
|
| 191 | // is processed -> events pushed.
|
| 192 | internal_events: VecDeque::with_capacity(128),
|
| 193 | }
|
| 194 | }
|
| 195 | }
|
| 196 |
|
| 197 | impl Parser {
|
| 198 | fn advance(&mut self, buffer: &[u8], more: bool) {
|
| 199 | for (idx, byte) in buffer.iter().enumerate() {
|
| 200 | let more = idx + 1 < buffer.len() || more;
|
| 201 |
|
| 202 | self.buffer.push(*byte);
|
| 203 |
|
| 204 | match parse_event(&self.buffer, more) {
|
| 205 | Ok(Some(ie)) => {
|
| 206 | self.internal_events.push_back(ie);
|
| 207 | self.buffer.clear();
|
| 208 | }
|
| 209 | Ok(None) => {
|
| 210 | // Event can't be parsed, because we don't have enough bytes for
|
| 211 | // the current sequence. Keep the buffer and process next bytes.
|
| 212 | }
|
| 213 | Err(_) => {
|
| 214 | // Event can't be parsed (not enough parameters, parameter is not a number, ...).
|
| 215 | // Clear the buffer and continue with another sequence.
|
| 216 | self.buffer.clear();
|
| 217 | }
|
| 218 | }
|
| 219 | }
|
| 220 | }
|
| 221 | }
|
| 222 |
|
| 223 | impl Iterator for Parser {
|
| 224 | type Item = InternalEvent;
|
| 225 |
|
| 226 | fn next(&mut self) -> Option<Self::Item> {
|
| 227 | self.internal_events.pop_front()
|
| 228 | }
|
| 229 | }
|
| 230 | |