1 | use crate::codec::decoder::Decoder; |
2 | use crate::codec::encoder::Encoder; |
3 | |
4 | use bytes::{Buf, BufMut, BytesMut}; |
5 | use std::{cmp, fmt, io, str, usize}; |
6 | |
7 | /// A simple [`Decoder`] and [`Encoder`] implementation that splits up data into lines. |
8 | /// |
9 | /// [`Decoder`]: crate::codec::Decoder |
10 | /// [`Encoder`]: crate::codec::Encoder |
11 | #[derive (Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] |
12 | pub struct LinesCodec { |
13 | // Stored index of the next index to examine for a `\n` character. |
14 | // This is used to optimize searching. |
15 | // For example, if `decode` was called with `abc`, it would hold `3`, |
16 | // because that is the next index to examine. |
17 | // The next time `decode` is called with `abcde\n`, the method will |
18 | // only look at `de\n` before returning. |
19 | next_index: usize, |
20 | |
21 | /// The maximum length for a given line. If `usize::MAX`, lines will be |
22 | /// read until a `\n` character is reached. |
23 | max_length: usize, |
24 | |
25 | /// Are we currently discarding the remainder of a line which was over |
26 | /// the length limit? |
27 | is_discarding: bool, |
28 | } |
29 | |
30 | impl LinesCodec { |
31 | /// Returns a `LinesCodec` for splitting up data into lines. |
32 | /// |
33 | /// # Note |
34 | /// |
35 | /// The returned `LinesCodec` will not have an upper bound on the length |
36 | /// of a buffered line. See the documentation for [`new_with_max_length`] |
37 | /// for information on why this could be a potential security risk. |
38 | /// |
39 | /// [`new_with_max_length`]: crate::codec::LinesCodec::new_with_max_length() |
40 | pub fn new() -> LinesCodec { |
41 | LinesCodec { |
42 | next_index: 0, |
43 | max_length: usize::MAX, |
44 | is_discarding: false, |
45 | } |
46 | } |
47 | |
48 | /// Returns a `LinesCodec` with a maximum line length limit. |
49 | /// |
50 | /// If this is set, calls to `LinesCodec::decode` will return a |
51 | /// [`LinesCodecError`] when a line exceeds the length limit. Subsequent calls |
52 | /// will discard up to `limit` bytes from that line until a newline |
53 | /// character is reached, returning `None` until the line over the limit |
54 | /// has been fully discarded. After that point, calls to `decode` will |
55 | /// function as normal. |
56 | /// |
57 | /// # Note |
58 | /// |
59 | /// Setting a length limit is highly recommended for any `LinesCodec` which |
60 | /// will be exposed to untrusted input. Otherwise, the size of the buffer |
61 | /// that holds the line currently being read is unbounded. An attacker could |
62 | /// exploit this unbounded buffer by sending an unbounded amount of input |
63 | /// without any `\n` characters, causing unbounded memory consumption. |
64 | /// |
65 | /// [`LinesCodecError`]: crate::codec::LinesCodecError |
66 | pub fn new_with_max_length(max_length: usize) -> Self { |
67 | LinesCodec { |
68 | max_length, |
69 | ..LinesCodec::new() |
70 | } |
71 | } |
72 | |
73 | /// Returns the maximum line length when decoding. |
74 | /// |
75 | /// ``` |
76 | /// use std::usize; |
77 | /// use tokio_util::codec::LinesCodec; |
78 | /// |
79 | /// let codec = LinesCodec::new(); |
80 | /// assert_eq!(codec.max_length(), usize::MAX); |
81 | /// ``` |
82 | /// ``` |
83 | /// use tokio_util::codec::LinesCodec; |
84 | /// |
85 | /// let codec = LinesCodec::new_with_max_length(256); |
86 | /// assert_eq!(codec.max_length(), 256); |
87 | /// ``` |
88 | pub fn max_length(&self) -> usize { |
89 | self.max_length |
90 | } |
91 | } |
92 | |
93 | fn utf8(buf: &[u8]) -> Result<&str, io::Error> { |
94 | str::from_utf8(buf) |
95 | .map_err(|_| io::Error::new(kind:io::ErrorKind::InvalidData, error:"Unable to decode input as UTF8" )) |
96 | } |
97 | |
98 | fn without_carriage_return(s: &[u8]) -> &[u8] { |
99 | if let Some(&b' \r' ) = s.last() { |
100 | &s[..s.len() - 1] |
101 | } else { |
102 | s |
103 | } |
104 | } |
105 | |
106 | impl Decoder for LinesCodec { |
107 | type Item = String; |
108 | type Error = LinesCodecError; |
109 | |
110 | fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, LinesCodecError> { |
111 | loop { |
112 | // Determine how far into the buffer we'll search for a newline. If |
113 | // there's no max_length set, we'll read to the end of the buffer. |
114 | let read_to = cmp::min(self.max_length.saturating_add(1), buf.len()); |
115 | |
116 | let newline_offset = buf[self.next_index..read_to] |
117 | .iter() |
118 | .position(|b| *b == b' \n' ); |
119 | |
120 | match (self.is_discarding, newline_offset) { |
121 | (true, Some(offset)) => { |
122 | // If we found a newline, discard up to that offset and |
123 | // then stop discarding. On the next iteration, we'll try |
124 | // to read a line normally. |
125 | buf.advance(offset + self.next_index + 1); |
126 | self.is_discarding = false; |
127 | self.next_index = 0; |
128 | } |
129 | (true, None) => { |
130 | // Otherwise, we didn't find a newline, so we'll discard |
131 | // everything we read. On the next iteration, we'll continue |
132 | // discarding up to max_len bytes unless we find a newline. |
133 | buf.advance(read_to); |
134 | self.next_index = 0; |
135 | if buf.is_empty() { |
136 | return Ok(None); |
137 | } |
138 | } |
139 | (false, Some(offset)) => { |
140 | // Found a line! |
141 | let newline_index = offset + self.next_index; |
142 | self.next_index = 0; |
143 | let line = buf.split_to(newline_index + 1); |
144 | let line = &line[..line.len() - 1]; |
145 | let line = without_carriage_return(line); |
146 | let line = utf8(line)?; |
147 | return Ok(Some(line.to_string())); |
148 | } |
149 | (false, None) if buf.len() > self.max_length => { |
150 | // Reached the maximum length without finding a |
151 | // newline, return an error and start discarding on the |
152 | // next call. |
153 | self.is_discarding = true; |
154 | return Err(LinesCodecError::MaxLineLengthExceeded); |
155 | } |
156 | (false, None) => { |
157 | // We didn't find a line or reach the length limit, so the next |
158 | // call will resume searching at the current offset. |
159 | self.next_index = read_to; |
160 | return Ok(None); |
161 | } |
162 | } |
163 | } |
164 | } |
165 | |
166 | fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, LinesCodecError> { |
167 | Ok(match self.decode(buf)? { |
168 | Some(frame) => Some(frame), |
169 | None => { |
170 | // No terminating newline - return remaining data, if any |
171 | if buf.is_empty() || buf == &b" \r" [..] { |
172 | None |
173 | } else { |
174 | let line = buf.split_to(buf.len()); |
175 | let line = without_carriage_return(&line); |
176 | let line = utf8(line)?; |
177 | self.next_index = 0; |
178 | Some(line.to_string()) |
179 | } |
180 | } |
181 | }) |
182 | } |
183 | } |
184 | |
185 | impl<T> Encoder<T> for LinesCodec |
186 | where |
187 | T: AsRef<str>, |
188 | { |
189 | type Error = LinesCodecError; |
190 | |
191 | fn encode(&mut self, line: T, buf: &mut BytesMut) -> Result<(), LinesCodecError> { |
192 | let line: &str = line.as_ref(); |
193 | buf.reserve(additional:line.len() + 1); |
194 | buf.put(src:line.as_bytes()); |
195 | buf.put_u8(b' \n' ); |
196 | Ok(()) |
197 | } |
198 | } |
199 | |
200 | impl Default for LinesCodec { |
201 | fn default() -> Self { |
202 | Self::new() |
203 | } |
204 | } |
205 | |
206 | /// An error occurred while encoding or decoding a line. |
207 | #[derive (Debug)] |
208 | pub enum LinesCodecError { |
209 | /// The maximum line length was exceeded. |
210 | MaxLineLengthExceeded, |
211 | /// An IO error occurred. |
212 | Io(io::Error), |
213 | } |
214 | |
215 | impl fmt::Display for LinesCodecError { |
216 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
217 | match self { |
218 | LinesCodecError::MaxLineLengthExceeded => write!(f, "max line length exceeded" ), |
219 | LinesCodecError::Io(e: &Error) => write!(f, " {}" , e), |
220 | } |
221 | } |
222 | } |
223 | |
224 | impl From<io::Error> for LinesCodecError { |
225 | fn from(e: io::Error) -> LinesCodecError { |
226 | LinesCodecError::Io(e) |
227 | } |
228 | } |
229 | |
230 | impl std::error::Error for LinesCodecError {} |
231 | |