| 1 | use crate::codec::decoder::Decoder; |
| 2 | use crate::codec::encoder::Encoder; |
| 3 | |
| 4 | use bytes::{Buf, BufMut, BytesMut}; |
| 5 | use std::{cmp, fmt, io, str}; |
| 6 | |
| 7 | /// A simple [`Decoder`] and [`Encoder`] implementation that splits up data into lines. |
| 8 | /// |
| 9 | /// This uses the `\n` character as the line ending on all platforms. |
| 10 | /// |
| 11 | /// [`Decoder`]: crate::codec::Decoder |
| 12 | /// [`Encoder`]: crate::codec::Encoder |
| 13 | #[derive (Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] |
| 14 | pub struct LinesCodec { |
| 15 | // Stored index of the next index to examine for a `\n` character. |
| 16 | // This is used to optimize searching. |
| 17 | // For example, if `decode` was called with `abc`, it would hold `3`, |
| 18 | // because that is the next index to examine. |
| 19 | // The next time `decode` is called with `abcde\n`, the method will |
| 20 | // only look at `de\n` before returning. |
| 21 | next_index: usize, |
| 22 | |
| 23 | /// The maximum length for a given line. If `usize::MAX`, lines will be |
| 24 | /// read until a `\n` character is reached. |
| 25 | max_length: usize, |
| 26 | |
| 27 | /// Are we currently discarding the remainder of a line which was over |
| 28 | /// the length limit? |
| 29 | is_discarding: bool, |
| 30 | } |
| 31 | |
| 32 | impl LinesCodec { |
| 33 | /// Returns a `LinesCodec` for splitting up data into lines. |
| 34 | /// |
| 35 | /// # Note |
| 36 | /// |
| 37 | /// The returned `LinesCodec` will not have an upper bound on the length |
| 38 | /// of a buffered line. See the documentation for [`new_with_max_length`] |
| 39 | /// for information on why this could be a potential security risk. |
| 40 | /// |
| 41 | /// [`new_with_max_length`]: crate::codec::LinesCodec::new_with_max_length() |
| 42 | pub fn new() -> LinesCodec { |
| 43 | LinesCodec { |
| 44 | next_index: 0, |
| 45 | max_length: usize::MAX, |
| 46 | is_discarding: false, |
| 47 | } |
| 48 | } |
| 49 | |
| 50 | /// Returns a `LinesCodec` with a maximum line length limit. |
| 51 | /// |
| 52 | /// If this is set, calls to `LinesCodec::decode` will return a |
| 53 | /// [`LinesCodecError`] when a line exceeds the length limit. Subsequent calls |
| 54 | /// will discard up to `limit` bytes from that line until a newline |
| 55 | /// character is reached, returning `None` until the line over the limit |
| 56 | /// has been fully discarded. After that point, calls to `decode` will |
| 57 | /// function as normal. |
| 58 | /// |
| 59 | /// # Note |
| 60 | /// |
| 61 | /// Setting a length limit is highly recommended for any `LinesCodec` which |
| 62 | /// will be exposed to untrusted input. Otherwise, the size of the buffer |
| 63 | /// that holds the line currently being read is unbounded. An attacker could |
| 64 | /// exploit this unbounded buffer by sending an unbounded amount of input |
| 65 | /// without any `\n` characters, causing unbounded memory consumption. |
| 66 | /// |
| 67 | /// [`LinesCodecError`]: crate::codec::LinesCodecError |
| 68 | pub fn new_with_max_length(max_length: usize) -> Self { |
| 69 | LinesCodec { |
| 70 | max_length, |
| 71 | ..LinesCodec::new() |
| 72 | } |
| 73 | } |
| 74 | |
| 75 | /// Returns the maximum line length when decoding. |
| 76 | /// |
| 77 | /// ``` |
| 78 | /// use std::usize; |
| 79 | /// use tokio_util::codec::LinesCodec; |
| 80 | /// |
| 81 | /// let codec = LinesCodec::new(); |
| 82 | /// assert_eq!(codec.max_length(), usize::MAX); |
| 83 | /// ``` |
| 84 | /// ``` |
| 85 | /// use tokio_util::codec::LinesCodec; |
| 86 | /// |
| 87 | /// let codec = LinesCodec::new_with_max_length(256); |
| 88 | /// assert_eq!(codec.max_length(), 256); |
| 89 | /// ``` |
| 90 | pub fn max_length(&self) -> usize { |
| 91 | self.max_length |
| 92 | } |
| 93 | } |
| 94 | |
| 95 | fn utf8(buf: &[u8]) -> Result<&str, io::Error> { |
| 96 | str::from_utf8(buf) |
| 97 | .map_err(|_| io::Error::new(kind:io::ErrorKind::InvalidData, error:"Unable to decode input as UTF8" )) |
| 98 | } |
| 99 | |
| 100 | fn without_carriage_return(s: &[u8]) -> &[u8] { |
| 101 | if let Some(&b' \r' ) = s.last() { |
| 102 | &s[..s.len() - 1] |
| 103 | } else { |
| 104 | s |
| 105 | } |
| 106 | } |
| 107 | |
| 108 | impl Decoder for LinesCodec { |
| 109 | type Item = String; |
| 110 | type Error = LinesCodecError; |
| 111 | |
| 112 | fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, LinesCodecError> { |
| 113 | loop { |
| 114 | // Determine how far into the buffer we'll search for a newline. If |
| 115 | // there's no max_length set, we'll read to the end of the buffer. |
| 116 | let read_to = cmp::min(self.max_length.saturating_add(1), buf.len()); |
| 117 | |
| 118 | let newline_offset = buf[self.next_index..read_to] |
| 119 | .iter() |
| 120 | .position(|b| *b == b' \n' ); |
| 121 | |
| 122 | match (self.is_discarding, newline_offset) { |
| 123 | (true, Some(offset)) => { |
| 124 | // If we found a newline, discard up to that offset and |
| 125 | // then stop discarding. On the next iteration, we'll try |
| 126 | // to read a line normally. |
| 127 | buf.advance(offset + self.next_index + 1); |
| 128 | self.is_discarding = false; |
| 129 | self.next_index = 0; |
| 130 | } |
| 131 | (true, None) => { |
| 132 | // Otherwise, we didn't find a newline, so we'll discard |
| 133 | // everything we read. On the next iteration, we'll continue |
| 134 | // discarding up to max_len bytes unless we find a newline. |
| 135 | buf.advance(read_to); |
| 136 | self.next_index = 0; |
| 137 | if buf.is_empty() { |
| 138 | return Ok(None); |
| 139 | } |
| 140 | } |
| 141 | (false, Some(offset)) => { |
| 142 | // Found a line! |
| 143 | let newline_index = offset + self.next_index; |
| 144 | self.next_index = 0; |
| 145 | let line = buf.split_to(newline_index + 1); |
| 146 | let line = &line[..line.len() - 1]; |
| 147 | let line = without_carriage_return(line); |
| 148 | let line = utf8(line)?; |
| 149 | return Ok(Some(line.to_string())); |
| 150 | } |
| 151 | (false, None) if buf.len() > self.max_length => { |
| 152 | // Reached the maximum length without finding a |
| 153 | // newline, return an error and start discarding on the |
| 154 | // next call. |
| 155 | self.is_discarding = true; |
| 156 | return Err(LinesCodecError::MaxLineLengthExceeded); |
| 157 | } |
| 158 | (false, None) => { |
| 159 | // We didn't find a line or reach the length limit, so the next |
| 160 | // call will resume searching at the current offset. |
| 161 | self.next_index = read_to; |
| 162 | return Ok(None); |
| 163 | } |
| 164 | } |
| 165 | } |
| 166 | } |
| 167 | |
| 168 | fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, LinesCodecError> { |
| 169 | Ok(match self.decode(buf)? { |
| 170 | Some(frame) => Some(frame), |
| 171 | None => { |
| 172 | self.next_index = 0; |
| 173 | // No terminating newline - return remaining data, if any |
| 174 | if buf.is_empty() || buf == &b" \r" [..] { |
| 175 | None |
| 176 | } else { |
| 177 | let line = buf.split_to(buf.len()); |
| 178 | let line = without_carriage_return(&line); |
| 179 | let line = utf8(line)?; |
| 180 | Some(line.to_string()) |
| 181 | } |
| 182 | } |
| 183 | }) |
| 184 | } |
| 185 | } |
| 186 | |
| 187 | impl<T> Encoder<T> for LinesCodec |
| 188 | where |
| 189 | T: AsRef<str>, |
| 190 | { |
| 191 | type Error = LinesCodecError; |
| 192 | |
| 193 | fn encode(&mut self, line: T, buf: &mut BytesMut) -> Result<(), LinesCodecError> { |
| 194 | let line: &str = line.as_ref(); |
| 195 | buf.reserve(additional:line.len() + 1); |
| 196 | buf.put(src:line.as_bytes()); |
| 197 | buf.put_u8(b' \n' ); |
| 198 | Ok(()) |
| 199 | } |
| 200 | } |
| 201 | |
| 202 | impl Default for LinesCodec { |
| 203 | fn default() -> Self { |
| 204 | Self::new() |
| 205 | } |
| 206 | } |
| 207 | |
| 208 | /// An error occurred while encoding or decoding a line. |
| 209 | #[derive (Debug)] |
| 210 | pub enum LinesCodecError { |
| 211 | /// The maximum line length was exceeded. |
| 212 | MaxLineLengthExceeded, |
| 213 | /// An IO error occurred. |
| 214 | Io(io::Error), |
| 215 | } |
| 216 | |
| 217 | impl fmt::Display for LinesCodecError { |
| 218 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 219 | match self { |
| 220 | LinesCodecError::MaxLineLengthExceeded => write!(f, "max line length exceeded" ), |
| 221 | LinesCodecError::Io(e: &Error) => write!(f, " {e}" ), |
| 222 | } |
| 223 | } |
| 224 | } |
| 225 | |
| 226 | impl From<io::Error> for LinesCodecError { |
| 227 | fn from(e: io::Error) -> LinesCodecError { |
| 228 | LinesCodecError::Io(e) |
| 229 | } |
| 230 | } |
| 231 | |
| 232 | impl std::error::Error for LinesCodecError {} |
| 233 | |