1use crate::codec::decoder::Decoder;
2use crate::codec::encoder::Encoder;
3
4use bytes::{Buf, BufMut, BytesMut};
5use std::{cmp, fmt, io, str, usize};
6
7/// A simple [`Decoder`] and [`Encoder`] implementation that splits up data into lines.
8///
9/// [`Decoder`]: crate::codec::Decoder
10/// [`Encoder`]: crate::codec::Encoder
11#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
12pub struct LinesCodec {
13 // Stored index of the next index to examine for a `\n` character.
14 // This is used to optimize searching.
15 // For example, if `decode` was called with `abc`, it would hold `3`,
16 // because that is the next index to examine.
17 // The next time `decode` is called with `abcde\n`, the method will
18 // only look at `de\n` before returning.
19 next_index: usize,
20
21 /// The maximum length for a given line. If `usize::MAX`, lines will be
22 /// read until a `\n` character is reached.
23 max_length: usize,
24
25 /// Are we currently discarding the remainder of a line which was over
26 /// the length limit?
27 is_discarding: bool,
28}
29
30impl LinesCodec {
31 /// Returns a `LinesCodec` for splitting up data into lines.
32 ///
33 /// # Note
34 ///
35 /// The returned `LinesCodec` will not have an upper bound on the length
36 /// of a buffered line. See the documentation for [`new_with_max_length`]
37 /// for information on why this could be a potential security risk.
38 ///
39 /// [`new_with_max_length`]: crate::codec::LinesCodec::new_with_max_length()
40 pub fn new() -> LinesCodec {
41 LinesCodec {
42 next_index: 0,
43 max_length: usize::MAX,
44 is_discarding: false,
45 }
46 }
47
48 /// Returns a `LinesCodec` with a maximum line length limit.
49 ///
50 /// If this is set, calls to `LinesCodec::decode` will return a
51 /// [`LinesCodecError`] when a line exceeds the length limit. Subsequent calls
52 /// will discard up to `limit` bytes from that line until a newline
53 /// character is reached, returning `None` until the line over the limit
54 /// has been fully discarded. After that point, calls to `decode` will
55 /// function as normal.
56 ///
57 /// # Note
58 ///
59 /// Setting a length limit is highly recommended for any `LinesCodec` which
60 /// will be exposed to untrusted input. Otherwise, the size of the buffer
61 /// that holds the line currently being read is unbounded. An attacker could
62 /// exploit this unbounded buffer by sending an unbounded amount of input
63 /// without any `\n` characters, causing unbounded memory consumption.
64 ///
65 /// [`LinesCodecError`]: crate::codec::LinesCodecError
66 pub fn new_with_max_length(max_length: usize) -> Self {
67 LinesCodec {
68 max_length,
69 ..LinesCodec::new()
70 }
71 }
72
73 /// Returns the maximum line length when decoding.
74 ///
75 /// ```
76 /// use std::usize;
77 /// use tokio_util::codec::LinesCodec;
78 ///
79 /// let codec = LinesCodec::new();
80 /// assert_eq!(codec.max_length(), usize::MAX);
81 /// ```
82 /// ```
83 /// use tokio_util::codec::LinesCodec;
84 ///
85 /// let codec = LinesCodec::new_with_max_length(256);
86 /// assert_eq!(codec.max_length(), 256);
87 /// ```
88 pub fn max_length(&self) -> usize {
89 self.max_length
90 }
91}
92
93fn utf8(buf: &[u8]) -> Result<&str, io::Error> {
94 str::from_utf8(buf)
95 .map_err(|_| io::Error::new(kind:io::ErrorKind::InvalidData, error:"Unable to decode input as UTF8"))
96}
97
98fn without_carriage_return(s: &[u8]) -> &[u8] {
99 if let Some(&b'\r') = s.last() {
100 &s[..s.len() - 1]
101 } else {
102 s
103 }
104}
105
106impl Decoder for LinesCodec {
107 type Item = String;
108 type Error = LinesCodecError;
109
110 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, LinesCodecError> {
111 loop {
112 // Determine how far into the buffer we'll search for a newline. If
113 // there's no max_length set, we'll read to the end of the buffer.
114 let read_to = cmp::min(self.max_length.saturating_add(1), buf.len());
115
116 let newline_offset = buf[self.next_index..read_to]
117 .iter()
118 .position(|b| *b == b'\n');
119
120 match (self.is_discarding, newline_offset) {
121 (true, Some(offset)) => {
122 // If we found a newline, discard up to that offset and
123 // then stop discarding. On the next iteration, we'll try
124 // to read a line normally.
125 buf.advance(offset + self.next_index + 1);
126 self.is_discarding = false;
127 self.next_index = 0;
128 }
129 (true, None) => {
130 // Otherwise, we didn't find a newline, so we'll discard
131 // everything we read. On the next iteration, we'll continue
132 // discarding up to max_len bytes unless we find a newline.
133 buf.advance(read_to);
134 self.next_index = 0;
135 if buf.is_empty() {
136 return Ok(None);
137 }
138 }
139 (false, Some(offset)) => {
140 // Found a line!
141 let newline_index = offset + self.next_index;
142 self.next_index = 0;
143 let line = buf.split_to(newline_index + 1);
144 let line = &line[..line.len() - 1];
145 let line = without_carriage_return(line);
146 let line = utf8(line)?;
147 return Ok(Some(line.to_string()));
148 }
149 (false, None) if buf.len() > self.max_length => {
150 // Reached the maximum length without finding a
151 // newline, return an error and start discarding on the
152 // next call.
153 self.is_discarding = true;
154 return Err(LinesCodecError::MaxLineLengthExceeded);
155 }
156 (false, None) => {
157 // We didn't find a line or reach the length limit, so the next
158 // call will resume searching at the current offset.
159 self.next_index = read_to;
160 return Ok(None);
161 }
162 }
163 }
164 }
165
166 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, LinesCodecError> {
167 Ok(match self.decode(buf)? {
168 Some(frame) => Some(frame),
169 None => {
170 // No terminating newline - return remaining data, if any
171 if buf.is_empty() || buf == &b"\r"[..] {
172 None
173 } else {
174 let line = buf.split_to(buf.len());
175 let line = without_carriage_return(&line);
176 let line = utf8(line)?;
177 self.next_index = 0;
178 Some(line.to_string())
179 }
180 }
181 })
182 }
183}
184
185impl<T> Encoder<T> for LinesCodec
186where
187 T: AsRef<str>,
188{
189 type Error = LinesCodecError;
190
191 fn encode(&mut self, line: T, buf: &mut BytesMut) -> Result<(), LinesCodecError> {
192 let line: &str = line.as_ref();
193 buf.reserve(additional:line.len() + 1);
194 buf.put(src:line.as_bytes());
195 buf.put_u8(b'\n');
196 Ok(())
197 }
198}
199
200impl Default for LinesCodec {
201 fn default() -> Self {
202 Self::new()
203 }
204}
205
206/// An error occurred while encoding or decoding a line.
207#[derive(Debug)]
208pub enum LinesCodecError {
209 /// The maximum line length was exceeded.
210 MaxLineLengthExceeded,
211 /// An IO error occurred.
212 Io(io::Error),
213}
214
215impl fmt::Display for LinesCodecError {
216 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217 match self {
218 LinesCodecError::MaxLineLengthExceeded => write!(f, "max line length exceeded"),
219 LinesCodecError::Io(e: &Error) => write!(f, "{}", e),
220 }
221 }
222}
223
224impl From<io::Error> for LinesCodecError {
225 fn from(e: io::Error) -> LinesCodecError {
226 LinesCodecError::Io(e)
227 }
228}
229
230impl std::error::Error for LinesCodecError {}
231